binary_ensemble.stream¶
The stream module is the low-level API for plain .ben and .xben files. A plain stream
contains assignments only; it does not carry a graph, metadata, or custom assets. Prefer
binary_ensemble.bundle unless some other tool specifically needs raw stream files.
Stream vs. bundle¶
Need |
Use |
|---|---|
Self-describing file with graph and metadata |
|
Small raw stream for another tool |
|
Whole-file JSONL conversion |
Plain BEN/XBEN stream encoding and decoding.
BenEncoder writes a plain .ben stream; BenDecoder iterates a plain .ben /
.xben stream. Both are stream-only: opening a decoder on a .bendl bundle, or trying to
read bundle assets, raises and points you at binary_ensemble.bundle. For the recommended
single-file bundle format, use binary_ensemble.bundle.BendlEncoder /
binary_ensemble.bundle.BendlDecoder.
Encoder¶
BenEncoder writes .ben streams. It does not write .xben directly; encode to BEN first,
then call binary_ensemble.codec.encode_ben_to_xben() for archival compression.
from binary_ensemble import BenEncoder
with BenEncoder("api-chain.ben", overwrite=True, variant="twodelta") as encoder:
encoder.write([1, 1, 2, 2])
encoder.write([1, 2, 2, 2])
Variant choices are documented in Encoding variants. Decoders auto-detect the variant from the stream banner, so you only choose it when encoding.
- class BenEncoder(file_path, overwrite=False, variant='twodelta')¶
Bases:
objectEncoder for plain Binary Ensemble (
.ben) streams.Write assignments one at a time with
write(), thenclose()to flush and finish the file. The encoder is a context manager, so the idiomatic pattern is:with BenEncoder("plans.ben", overwrite=True) as enc: for assignment in plans: enc.write(assignment)
This produces a plain BEN stream with no bundle framing. To produce a self-describing
.bendlbundle (with an embedded graph, metadata, or other assets) useBendlEncoderinstead.- Parameters:
file_path (StrPath) – Output path (
stroros.PathLike). Must not exist unlessoverwrite=True.overwrite (bool, optional) – Replace an existing file at
file_path. Default isFalse.variant (Variant, optional) – BEN encoding variant for the stream:
"standard","mkv_chain", or"twodelta". Default is"twodelta".
- Raises:
OSError – If
file_pathexists andoverwriteisFalse, or it cannot be created.ValueError – If
variantis not a recognized variant name.
- close()¶
Flush the assignment stream and close the underlying file. Idempotent.
Decoder¶
BenDecoder iterates plain .ben or .xben streams. Use mode="xben" for XBEN:
from binary_ensemble import BenDecoder
ben_decoder = BenDecoder("chain.ben")
xben_decoder = BenDecoder("chain.xben", mode="xben")
assert len(ben_decoder) == len(xben_decoder)
The same subsampling methods available on bundles are available here:
from binary_ensemble import BenDecoder
for assignment in BenDecoder("chain.ben").subsample_every(25):
print(assignment[:4])
Plain BEN is the fastest format for repeated reads and subsampling. XBEN is smaller, but it pays a decompression startup cost.
- class BenDecoder(file_path, mode='ben')¶
Bases:
objectIterator over the assignments in a plain BEN or XBEN stream.
Iterate the decoder to yield one assignment at a time, each a
list[int]of district ids in dual-graph node order.len()reports the (expanded) sample count and is cheap to call, so it is safe to use for a progress bar. The encoding variant is detected automatically from the stream, so it is never passed when reading.This decoder is stream-only: opening it on a
.bendlbundle raises and points the caller atBendlDecoder, which carries the bundle inspection surface (assets, embedded graph, metadata). This mirrors thebenvsbendlsplit of the command-line tools.- Parameters:
file_path (StrPath) – Path to the input
.benor.xbenfile (stroros.PathLike).mode (AssignmentFormat, optional) – Which reader to use:
"ben"or"xben". Opening an XBEN stream warns about a one-time decompression startup cost. Default is"ben".
- Raises:
Exception – If
file_pathis a.bendlbundle (useBendlDecoderinstead), ormodedoes not match the file’s actual format.OSError – If the file cannot be opened or its banner is malformed.
Example
>>> from binary_ensemble import BenDecoder >>> for assignment in BenDecoder("plans.ben"): ... print(assignment[:8])
- assignment_format()¶
Return the container format of the underlying stream as
"ben"or"xben".
- count_samples()¶
Count the samples in the stream.
The result is the expanded sample count: a frame that repeats five identical samples contributes five. The first call walks the stream to count; the result is cached, so repeated calls (and
len()) are cheap afterwards.- Returns:
int – The number of samples in the stream.
- subsample_every(step, offset=1)¶
Restrict iteration to every
step-th sample.- Parameters:
- Returns:
BenDecoder –
self, for chaining into aforloop.- Raises:
Exception – If
steporoffsetis0(both are 1-based).
Example
>>> for plan in BenDecoder("plans.ben").subsample_every(1000): ... ...
- subsample_indices(indices, /)¶
Restrict iteration to the samples at the given 1-indexed positions.
Skipped samples are never materialized as Python lists, and where the encoding variant allows it (
standard,mkv_chain) whole frames are skipped without being unpacked, so this stays fast on large ensembles.- Parameters:
indices (Sequence[int]) – The 1-indexed sample numbers to keep. Duplicates are dropped; an unsorted list is sorted, with a
UserWarning.- Returns:
BenDecoder –
self, so the call can be chained directly into aforloop.- Raises:
Exception – If
indicesis empty, contains0(indices are 1-based), or contains an index greater than the number of samples in the stream.
Example
>>> for plan in BenDecoder("plans.ben").subsample_indices([1, 500, 9999]): ... ...
- subsample_range(start, end, /)¶
Restrict iteration to a contiguous, 1-indexed inclusive range of samples.
- Parameters:
- Returns:
BenDecoder –
self, for chaining into aforloop.- Raises:
Exception – If
startis0,endis less thanstart, orendis greater than the number of samples in the stream.
Example
>>> list(BenDecoder("plans.ben").subsample_range(10, 15)) # samples 10, 11, 12, 13, 14, and 15