duck.utils.fileio¶
FileIOStream module.
Provides both synchronous and asynchronous file streaming interfaces.
Ideal for efficient reading of large files using chunked reads and supporting
standard seek, tell, and close operations in both environments.
Methods that do not need to be async: Even in async context, the below methods don’t necessarily need to be async:
open- Time complexity is O(1)seek- Time complexity is O(1)tell- Time complexity is O(1)
In async context, only read, write, and close need to be asynchronous.
Caching:
Read results are cached in a shared LRU InMemoryCache keyed by
filepath:pos:size. Each unique (file, offset, length) triple has its
own independent cache slot, so reads from different positions are all
cache-warm without interfering with each other — beneficial in environments
that seek frequently. Stale entries (detected via mtime) are evicted on
access. On write, overlapping cache entries are patched in memory without a
disk round-trip: entries fully covered by the write are spliced directly;
entries whose content is entirely inside the write range are reconstructed
in place; boundary-partial overlaps are evicted. The exact slice just
written is always stored in a new cache entry.
Events:
Hooks can be attached to on_read and on_write events via hook().
Each hook receives (stream, data, byte_count) and can be a plain callable
or an async coroutine function. Async hooks on a sync stream are scheduled
fire-and-forget on the running event loop when one is available.
Example::
stream = FileIOStream("data.bin", open_now=True)
def log_read(stream, data, n):
print(f"read {n} bytes from {stream.filepath}")
stream.hook("on_read", log_read)
stream.read()
Submodules¶
Package Contents¶
Classes¶
Asynchronous file streaming class. |
|
Synchronous file streaming class that mimics |
Functions¶
Converts a FileIOStream to an AsyncFileIOStream if not already async. |
Data¶
API¶
- class duck.utils.fileio.AsyncFileIOStream(*args, **kwargs)[source]¶
Bases:
duck.utils.fileio.FileIOStreamAsynchronous file streaming class.
Provides async-compatible methods for reading and writing files in a non-blocking way. Shares the same LRU cache and event-hook system as
FileIOStream.Writes update the cache directly with the written bytes, matching the synchronous behaviour. Async hooks registered on this stream are awaited inside the lock; sync hooks are called inline.
… admonition:: Notes
Compatible with async context managers (
async with).Initialization
Initialize the FileIOStream object.
- Parameters:
filepath – Path to the file to be streamed.
chunk_size – Maximum number of bytes to read or write at once. Defaults to 2 MB.
open_now – Whether to open the file immediately. Defaults to False.
mode – File open mode. Defaults to
'rb'.
- async fire_hooks_async(hooks: list[Callable], data: bytes) None[source]¶
Fires all hooks, awaiting async ones and calling sync ones inline.
Unlike the base :meth:
fire_hooks, this variant is itself a coroutine so it can be awaited inside the async lock without scheduling fire-and-forget tasks.- Parameters:
hooks – The list of callables to fire.
data – The bytes that triggered this event.
- async read(size: int = -1) bytes[source]¶
Asynchronously read from the file.
Results are served from the LRU cache when the file has not been modified since the last read. On a cache miss the file is read in a thread and the result is stored in the cache.
Fires all
on_readhooks after a successful read.- Parameters:
size – Max bytes to read.
-1reads full content.- Returns:
Data read from file.
- async write(data: bytes) int[source]¶
Asynchronously write data to the file.
The written bytes are flushed to disk then stored in the cache under the full-read key with the post-write mtime, so the next full read is served from cache without a disk round-trip.
Fires all
on_writehooks after a successful write.- Parameters:
data – Bytes to write.
- Returns:
Number of bytes written.
- duck.utils.fileio.FILE_CACHE: duck.utils.caching.InMemoryCache¶
‘InMemoryCache(…)’
- class duck.utils.fileio.FileIOStream(filepath: str, chunk_size: int = 2 * 1024 * 1024, open_now: bool = False, mode: str = 'rb')[source]¶
Bases:
io.IOBaseSynchronous file streaming class that mimics
io.IOBase.Provides chunked reading and writing with a shared LRU cache and a simple event-hook system for
on_readandon_write.Read results are served from cache when the file is unchanged since the last read. Writes update the cache directly with the written bytes so the next full read is served from cache without an extra disk round-trip.
Hooks are registered with :meth:
hookand fired after every read or write.Initialization
Initialize the FileIOStream object.
- Parameters:
filepath – Path to the file to be streamed.
chunk_size – Maximum number of bytes to read or write at once. Defaults to 2 MB.
open_now – Whether to open the file immediately. Defaults to False.
mode – File open mode. Defaults to
'rb'.
- __del__() None[source]¶
Ensure the file is closed on delete else it raises a RuntimeError.
Always calls the synchronous base
closedirectly —__del__can never be a coroutine, so we must not dispatch to the async override onAsyncFileIOStream.
- __slots__¶
None
- _accumulate_read_bytes(data: bytes) None[source]¶
Appends newly read bytes to the running
_total_read_bytesbuffer.- Parameters:
data – The bytes returned from the most recent read operation.
- cache_get(size: int) Optional[bytes][source]¶
Returns cached bytes for the current position and size, or
None.Each
(filepath, pos, size)triple has its own independent cache slot, so reads from different offsets are served correctly without interfering with each other. Stale entries (mtime mismatch) are evicted on access.- Parameters:
size – The read size passed to
read(), or-1for a full read.- Returns:
Cached bytes if the entry exists and is fresh, else
None.
- cache_patch_on_write(write_pos: int, data: bytes) None[source]¶
Surgically updates every cached entry that overlaps the written region.
Rather than flushing all cached entries or re-reading the whole file, this method iterates over existing cache keys for this filepath and patches any entry whose byte range overlaps
[write_pos, write_pos + len(data)). Entries that do not overlap are left untouched — they remain valid because their byte ranges were not affected by the write.A new entry is always written for
(write_pos, len(data))so the exact slice just written is immediately cache-warm.Entries that overlap but cannot be fully reconstructed from the in-memory write (e.g. partial overlaps at the boundary of a cached chunk) are evicted rather than storing incorrect data.
- Parameters:
write_pos – The file offset at which the write started.
data – The bytes that were just written.
- cache_set(pos: int, size: int, data: bytes) None[source]¶
Stores a read result in the cache keyed by position and size.
Records the mtime on the instance so
is_modifiedcan compare against it later without an extra cache lookup.- Parameters:
pos – The file offset at which the read started.
size – The read size used to build the cache key.
data – The bytes to cache.
- current_mtime() float[source]¶
Returns the file’s current modification time from the OS.
- Returns:
The
st_mtimevalue for this stream’s filepath, or 0.0 if the file does not exist.
- fire_hooks(hooks: list[Callable], data: bytes) None[source]¶
Fires all hooks in the given list with
(self, data, len(data)).Sync hooks are called inline. Async hooks are scheduled as fire-and-forget tasks on the running loop, or run in a new loop when none is active.
- Parameters:
hooks – The list of callables to fire.
data – The bytes that triggered this event.
- hook(event: str, fn: Callable) None[source]¶
Registers a hook function for the given event.
The hook is called after every matching operation with the signature::
fn(stream, data, byte_count)where
streamis thisFileIOStream,datais the bytes that were read or written, andbyte_countislen(data).Both plain callables and async coroutine functions are accepted. Async hooks on a synchronous stream are scheduled fire-and-forget on the running event loop when one is available.
- Parameters:
event – One of
"on_read"or"on_write".fn – The callable to register.
- Raises:
ValueError – When
eventis not a recognised event name.
- property is_modified: bool¶
Whether the file has been modified since this stream last read it.
Returns
Falsewhen no read has occurred yet — there is no baseline mtime to compare against.- Returns:
Trueif the file’s current mtime differs from the mtime recorded during the last read,Falseotherwise.
- make_cache_key(pos: int, size: int) str[source]¶
Builds the cache key for a read starting at
posof lengthsize.The key encodes the filepath, position, and size so that reads from different offsets occupy independent cache slots.
- Parameters:
pos – The file offset at which the read starts.
size – Number of bytes requested, or
-1for a full read.
- Returns:
A string cache key of the form
"filepath:pos:size".
- raise_if_in_async_context(message: str) None[source]¶
Raise an error if used inside an async context.
- read(size: int = -1) bytes[source]¶
Synchronously read data from the file.
Results are served from the LRU cache when the file has not been modified since the last read. On a cache miss the file is read normally and the result is stored in the cache for future calls.
Fires all
on_readhooks after a successful read.- Parameters:
size – Number of bytes to read.
-1reads all content.- Returns:
File data as bytes.
- write(data: bytes) int[source]¶
Synchronously write data to the file.
The written bytes are flushed to disk immediately, then stored in the cache under the full-read key (
size=-1) with the post-write mtime. This means the next full read is served from cache without a disk round-trip.Fires all
on_writehooks after a successful write.- Parameters:
data – Data to write.
- Returns:
Number of bytes written.
- duck.utils.fileio.VALID_EVENTS¶
‘frozenset(…)’
- duck.utils.fileio.to_async_fileio_stream(fileio_stream: FileIOStream) AsyncFileIOStream[source]¶
Converts a FileIOStream to an AsyncFileIOStream if not already async.
- Parameters:
fileio_stream – The synchronous stream to convert.
- Returns:
An AsyncFileIOStream wrapping the same underlying file descriptor.