binary_ensemble.stream

The stream module is the low-level API for plain .ben and .xben files. A plain stream contains assignments only; it does not carry a graph, metadata, or custom assets. Prefer binary_ensemble.bundle unless some other tool specifically needs raw stream files.

Stream vs. bundle

Need

Use

Self-describing file with graph and metadata

BendlEncoder / BendlDecoder

Small raw stream for another tool

BenEncoder / BenDecoder

Whole-file JSONL conversion

binary_ensemble.codec

Plain BEN/XBEN stream encoding and decoding.

BenEncoder writes a plain .ben stream; BenDecoder iterates a plain .ben / .xben stream. Both are stream-only: opening a decoder on a .bendl bundle, or trying to read bundle assets, raises and points you at binary_ensemble.bundle. For the recommended single-file bundle format, use binary_ensemble.bundle.BendlEncoder / binary_ensemble.bundle.BendlDecoder.

Encoder

BenEncoder writes .ben streams. It does not write .xben directly; encode to BEN first, then call binary_ensemble.codec.encode_ben_to_xben() for archival compression.

from binary_ensemble import BenEncoder

with BenEncoder("api-chain.ben", overwrite=True, variant="twodelta") as encoder:
    encoder.write([1, 1, 2, 2])
    encoder.write([1, 2, 2, 2])

Variant choices are documented in Encoding variants. Decoders auto-detect the variant from the stream banner, so you only choose it when encoding.

class BenEncoder(file_path, overwrite=False, variant='twodelta')

Bases: object

Encoder for plain Binary Ensemble (.ben) streams.

Write assignments one at a time with write(), then close() to flush and finish the file. The encoder is a context manager, so the idiomatic pattern is:

with BenEncoder("plans.ben", overwrite=True) as enc:
    for assignment in plans:
        enc.write(assignment)

This produces a plain BEN stream with no bundle framing. To produce a self-describing .bendl bundle (with an embedded graph, metadata, or other assets) use BendlEncoder instead.

Parameters:
  • file_path (StrPath) – Output path (str or os.PathLike). Must not exist unless overwrite=True.

  • overwrite (bool, optional) – Replace an existing file at file_path. Default is False.

  • variant (Variant, optional) – BEN encoding variant for the stream: "standard", "mkv_chain", or "twodelta". Default is "twodelta".

Raises:
  • OSError – If file_path exists and overwrite is False, or it cannot be created.

  • ValueError – If variant is not a recognized variant name.

close()

Flush the assignment stream and close the underlying file. Idempotent.

write()

Encode a single assignment and append it to the output stream.

Parameters:

assignment (Sequence[int]) – The plan as a sequence of district ids (e.g. list[int]), one per node in dual-graph node order.

Raises:

OSError – If the encoder has already been closed, or the write fails.

Decoder

BenDecoder iterates plain .ben or .xben streams. Use mode="xben" for XBEN:

from binary_ensemble import BenDecoder

ben_decoder = BenDecoder("chain.ben")
xben_decoder = BenDecoder("chain.xben", mode="xben")

assert len(ben_decoder) == len(xben_decoder)

The same subsampling methods available on bundles are available here:

from binary_ensemble import BenDecoder

for assignment in BenDecoder("chain.ben").subsample_every(25):
    print(assignment[:4])

Plain BEN is the fastest format for repeated reads and subsampling. XBEN is smaller, but it pays a decompression startup cost.

class BenDecoder(file_path, mode='ben')

Bases: object

Iterator over the assignments in a plain BEN or XBEN stream.

Iterate the decoder to yield one assignment at a time, each a list[int] of district ids in dual-graph node order. len() reports the (expanded) sample count and is cheap to call, so it is safe to use for a progress bar. The encoding variant is detected automatically from the stream, so it is never passed when reading.

This decoder is stream-only: opening it on a .bendl bundle raises and points the caller at BendlDecoder, which carries the bundle inspection surface (assets, embedded graph, metadata). This mirrors the ben vs bendl split of the command-line tools.

Parameters:
  • file_path (StrPath) – Path to the input .ben or .xben file (str or os.PathLike).

  • mode (AssignmentFormat, optional) – Which reader to use: "ben" or "xben". Opening an XBEN stream warns about a one-time decompression startup cost. Default is "ben".

Raises:
  • Exception – If file_path is a .bendl bundle (use BendlDecoder instead), or mode does not match the file’s actual format.

  • OSError – If the file cannot be opened or its banner is malformed.

Example

>>> from binary_ensemble import BenDecoder
>>> for assignment in BenDecoder("plans.ben"):
...     print(assignment[:8])
assignment_format()

Return the container format of the underlying stream as "ben" or "xben".

count_samples()

Count the samples in the stream.

The result is the expanded sample count: a frame that repeats five identical samples contributes five. The first call walks the stream to count; the result is cached, so repeated calls (and len()) are cheap afterwards.

Returns:

int – The number of samples in the stream.

subsample_every(step, offset=1)

Restrict iteration to every step-th sample.

Parameters:
  • step (int) – Stride between kept samples (e.g. 10 keeps every tenth sample).

  • offset (int, optional) – 1-indexed position of the first kept sample. Default is 1.

Returns:

BenDecoderself, for chaining into a for loop.

Raises:

Exception – If step or offset is 0 (both are 1-based).

Example

>>> for plan in BenDecoder("plans.ben").subsample_every(1000):
...     ...
subsample_indices(indices, /)

Restrict iteration to the samples at the given 1-indexed positions.

Skipped samples are never materialized as Python lists, and where the encoding variant allows it (standard, mkv_chain) whole frames are skipped without being unpacked, so this stays fast on large ensembles.

Parameters:

indices (Sequence[int]) – The 1-indexed sample numbers to keep. Duplicates are dropped; an unsorted list is sorted, with a UserWarning.

Returns:

BenDecoderself, so the call can be chained directly into a for loop.

Raises:

Exception – If indices is empty, contains 0 (indices are 1-based), or contains an index greater than the number of samples in the stream.

Example

>>> for plan in BenDecoder("plans.ben").subsample_indices([1, 500, 9999]):
...     ...
subsample_range(start, end, /)

Restrict iteration to a contiguous, 1-indexed inclusive range of samples.

Parameters:
  • start (int) – First sample number to keep (1-indexed, inclusive).

  • end (int) – Last sample number to keep (1-indexed, inclusive).

Returns:

BenDecoderself, for chaining into a for loop.

Raises:

Exception – If start is 0, end is less than start, or end is greater than the number of samples in the stream.

Example

>>> list(BenDecoder("plans.ben").subsample_range(10, 15))
# samples 10, 11, 12, 13, 14, and 15