duck.utils.fileio

FileIOStream module.

Provides both synchronous and asynchronous file streaming interfaces. Ideal for efficient reading of large files using chunked reads and supporting standard seek, tell, and close operations in both environments.

Methods that do not need to be async: Even in async context, the below methods don’t necessarily need to be async:

  1. open - Time complexity is O(1)

  2. seek - Time complexity is O(1)

  3. tell - Time complexity is O(1)

In async context, only read, write, and close need to be asynchronous.

Caching: Read results are cached in a shared LRU InMemoryCache keyed by filepath:pos:size. Each unique (file, offset, length) triple has its own independent cache slot, so reads from different positions are all cache-warm without interfering with each other — beneficial in environments that seek frequently. Stale entries (detected via mtime) are evicted on access. On write, overlapping cache entries are patched in memory without a disk round-trip: entries fully covered by the write are spliced directly; entries whose content is entirely inside the write range are reconstructed in place; boundary-partial overlaps are evicted. The exact slice just written is always stored in a new cache entry.

Events: Hooks can be attached to on_read and on_write events via hook(). Each hook receives (stream, data, byte_count) and can be a plain callable or an async coroutine function. Async hooks on a sync stream are scheduled fire-and-forget on the running event loop when one is available.

Example::

stream = FileIOStream("data.bin", open_now=True)

def log_read(stream, data, n):
    print(f"read {n} bytes from {stream.filepath}")

stream.hook("on_read", log_read)
stream.read()

Submodules

Package Contents

Classes

AsyncFileIOStream

Asynchronous file streaming class.

FileIOStream

Synchronous file streaming class that mimics io.IOBase.

Functions

to_async_fileio_stream

Converts a FileIOStream to an AsyncFileIOStream if not already async.

Data

FILE_CACHE

VALID_EVENTS

API

class duck.utils.fileio.AsyncFileIOStream(*args, **kwargs)[source]

Bases: duck.utils.fileio.FileIOStream

Asynchronous file streaming class.

Provides async-compatible methods for reading and writing files in a non-blocking way. Shares the same LRU cache and event-hook system as FileIOStream.

Writes update the cache directly with the written bytes, matching the synchronous behaviour. Async hooks registered on this stream are awaited inside the lock; sync hooks are called inline.

… admonition:: Notes

Compatible with async context managers (async with).

Initialization

Initialize the FileIOStream object.

Parameters:
  • filepath – Path to the file to be streamed.

  • chunk_size – Maximum number of bytes to read or write at once. Defaults to 2 MB.

  • open_now – Whether to open the file immediately. Defaults to False.

  • mode – File open mode. Defaults to 'rb'.

async __aenter__()[source]
async __aexit__(exc_type, exc, tb)[source]
async async_open() None[source]

Asynchronously open the file.

async close() None[source]

Asynchronously close the file.

async fire_hooks_async(hooks: list[Callable], data: bytes) None[source]

Fires all hooks, awaiting async ones and calling sync ones inline.

Unlike the base :meth:fire_hooks, this variant is itself a coroutine so it can be awaited inside the async lock without scheduling fire-and-forget tasks.

Parameters:
  • hooks – The list of callables to fire.

  • data – The bytes that triggered this event.

async read(size: int = -1) bytes[source]

Asynchronously read from the file.

Results are served from the LRU cache when the file has not been modified since the last read. On a cache miss the file is read in a thread and the result is stored in the cache.

Fires all on_read hooks after a successful read.

Parameters:

size – Max bytes to read. -1 reads full content.

Returns:

Data read from file.

async write(data: bytes) int[source]

Asynchronously write data to the file.

The written bytes are flushed to disk then stored in the cache under the full-read key with the post-write mtime, so the next full read is served from cache without a disk round-trip.

Fires all on_write hooks after a successful write.

Parameters:

data – Bytes to write.

Returns:

Number of bytes written.

duck.utils.fileio.FILE_CACHE: duck.utils.caching.InMemoryCache

‘InMemoryCache(…)’

class duck.utils.fileio.FileIOStream(filepath: str, chunk_size: int = 2 * 1024 * 1024, open_now: bool = False, mode: str = 'rb')[source]

Bases: io.IOBase

Synchronous file streaming class that mimics io.IOBase.

Provides chunked reading and writing with a shared LRU cache and a simple event-hook system for on_read and on_write.

Read results are served from cache when the file is unchanged since the last read. Writes update the cache directly with the written bytes so the next full read is served from cache without an extra disk round-trip.

Hooks are registered with :meth:hook and fired after every read or write.

Initialization

Initialize the FileIOStream object.

Parameters:
  • filepath – Path to the file to be streamed.

  • chunk_size – Maximum number of bytes to read or write at once. Defaults to 2 MB.

  • open_now – Whether to open the file immediately. Defaults to False.

  • mode – File open mode. Defaults to 'rb'.

__del__() None[source]

Ensure the file is closed on delete else it raises a RuntimeError.

Always calls the synchronous base close directly — __del__ can never be a coroutine, so we must not dispatch to the async override on AsyncFileIOStream.

__slots__

None

_accumulate_read_bytes(data: bytes) None[source]

Appends newly read bytes to the running _total_read_bytes buffer.

Parameters:

data – The bytes returned from the most recent read operation.

cache_get(size: int) Optional[bytes][source]

Returns cached bytes for the current position and size, or None.

Each (filepath, pos, size) triple has its own independent cache slot, so reads from different offsets are served correctly without interfering with each other. Stale entries (mtime mismatch) are evicted on access.

Parameters:

size – The read size passed to read(), or -1 for a full read.

Returns:

Cached bytes if the entry exists and is fresh, else None.

cache_patch_on_write(write_pos: int, data: bytes) None[source]

Surgically updates every cached entry that overlaps the written region.

Rather than flushing all cached entries or re-reading the whole file, this method iterates over existing cache keys for this filepath and patches any entry whose byte range overlaps [write_pos, write_pos + len(data)). Entries that do not overlap are left untouched — they remain valid because their byte ranges were not affected by the write.

A new entry is always written for (write_pos, len(data)) so the exact slice just written is immediately cache-warm.

Entries that overlap but cannot be fully reconstructed from the in-memory write (e.g. partial overlaps at the boundary of a cached chunk) are evicted rather than storing incorrect data.

Parameters:
  • write_pos – The file offset at which the write started.

  • data – The bytes that were just written.

cache_set(pos: int, size: int, data: bytes) None[source]

Stores a read result in the cache keyed by position and size.

Records the mtime on the instance so is_modified can compare against it later without an extra cache lookup.

Parameters:
  • pos – The file offset at which the read started.

  • size – The read size used to build the cache key.

  • data – The bytes to cache.

close() None[source]

Close the file.

current_mtime() float[source]

Returns the file’s current modification time from the OS.

Returns:

The st_mtime value for this stream’s filepath, or 0.0 if the file does not exist.

fire_hooks(hooks: list[Callable], data: bytes) None[source]

Fires all hooks in the given list with (self, data, len(data)).

Sync hooks are called inline. Async hooks are scheduled as fire-and-forget tasks on the running loop, or run in a new loop when none is active.

Parameters:
  • hooks – The list of callables to fire.

  • data – The bytes that triggered this event.

get_pos() int[source]

Get the stream pos.

hook(event: str, fn: Callable) None[source]

Registers a hook function for the given event.

The hook is called after every matching operation with the signature::

fn(stream, data, byte_count)

where stream is this FileIOStream, data is the bytes that were read or written, and byte_count is len(data).

Both plain callables and async coroutine functions are accepted. Async hooks on a synchronous stream are scheduled fire-and-forget on the running event loop when one is available.

Parameters:
  • event – One of "on_read" or "on_write".

  • fn – The callable to register.

Raises:

ValueError – When event is not a recognised event name.

increment_pos(pos: int)[source]

Increment the fileio pos - this does not seek to the pos at all.

property is_modified: bool

Whether the file has been modified since this stream last read it.

Returns False when no read has occurred yet — there is no baseline mtime to compare against.

Returns:

True if the file’s current mtime differs from the mtime recorded during the last read, False otherwise.

is_open() bool[source]

Check if the file is currently open.

make_cache_key(pos: int, size: int) str[source]

Builds the cache key for a read starting at pos of length size.

The key encodes the filepath, position, and size so that reads from different offsets occupy independent cache slots.

Parameters:
  • pos – The file offset at which the read starts.

  • size – Number of bytes requested, or -1 for a full read.

Returns:

A string cache key of the form "filepath:pos:size".

open() None[source]

Open the file using the provided mode.

raise_if_in_async_context(message: str) None[source]

Raise an error if used inside an async context.

read(size: int = -1) bytes[source]

Synchronously read data from the file.

Results are served from the LRU cache when the file has not been modified since the last read. On a cache miss the file is read normally and the result is stored in the cache for future calls.

Fires all on_read hooks after a successful read.

Parameters:

size – Number of bytes to read. -1 reads all content.

Returns:

File data as bytes.

seek(offset: int, whence: int = os.SEEK_SET) None[source]

Move the file pointer to a new location.

tell() int[source]

Get the current position in the file.

update_pos(pos: int)[source]

Update the fileio pos - this does not seek to the pos at all.

write(data: bytes) int[source]

Synchronously write data to the file.

The written bytes are flushed to disk immediately, then stored in the cache under the full-read key (size=-1) with the post-write mtime. This means the next full read is served from cache without a disk round-trip.

Fires all on_write hooks after a successful write.

Parameters:

data – Data to write.

Returns:

Number of bytes written.

duck.utils.fileio.VALID_EVENTS

‘frozenset(…)’

duck.utils.fileio.to_async_fileio_stream(fileio_stream: FileIOStream) AsyncFileIOStream[source]

Converts a FileIOStream to an AsyncFileIOStream if not already async.

Parameters:

fileio_stream – The synchronous stream to convert.

Returns:

An AsyncFileIOStream wrapping the same underlying file descriptor.