Skip to content

uhop/stream-chain

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

390 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stream-chain NPM version Node.js CI

stream-chain processes streams of objects — records, not raw text or bytes. It is especially designed for huge streams: multi-gigabyte database dumps, append-only logs, message-queue and other live feeds generated continuously and far too large to hold in memory. You handle one record at a time while the library keeps memory flat, propagating backpressure from end to end.

Each stage of the pipeline is an ordinary function. stream-chain wires your functions — alongside any Node or Web streams you already use — into a single Duplex stream, so the whole pipeline behaves like one stream you can compose further.

Why it might be for you:

  • Simple. A stage is an ordinary function: one input in, one value out, or chain.none to drop it. The model goes further — a stage can emit any number of values per input, from zero to many. A generator function does this naturally by yielding; a plain function can return chain.many([...]) instead, but that holds every value in memory, so generators are the leaner choice for fan-out.
  • Flexible. Mix sync functions, async functions, generators, async generators, and real streams in one chain — on whichever substrate you want: Node streams (the default), native Web Streams (/web), or substrate-free async iterables (/core).
  • Performance-minded. Throughput gets deliberate attention here — the hot paths are measured and tuned (a sync fast path, automatic stage grouping, fused file edges), the kind of care pipeline code often doesn't get. Real numbers depend on your hardware and workload, so benchmark on your own.
  • Solid. Lightweight, zero-dependency, bundled TypeScript typings, exercised by a broad test suite across Node, Bun, and Deno.

Example

Read a huge JSONL dump, transform each record, write the result back out — record by record, in constant memory:

import chain from 'stream-chain';
import parseFile from 'stream-chain/jsonl/file/parser.js';
import stringerToFile from 'stream-chain/jsonl/file/stringer.js';

const pipeline = chain([
  parseFile(), // read line-by-line -> {key, value} per record
  ({value}) => (value.active ? value : chain.none), // drop inactive records
  async user => ({...user, plan: await lookupPlan(user.id)}), // enrich asynchronously
  stringerToFile('active-users.jsonl') // write results back out
]);

pipeline.on('finish', () => console.log('done'));
pipeline.end('users.jsonl'); // a 10 TB file is fine

The middle stages are plain functions; stream-chain handles the streaming, backpressure, and file handles. The source and sink don't have to be JSONL — any object stream works: a database cursor, an HTTP response, stream-json assembling tokens into objects, your own generator. JSONL just happens to be the most common on-disk format for object streams, so it ships in the box. See Intro by examples for more.

Installation

npm i --save stream-chain

What's in the box

  • chain() — the factory: turns an array of functions, arrays, and streams into one Duplex pipeline.
  • Transducers:
    • gen() — an async generator built from a list of functions: each stage can emit zero-to-many values per input, yielded lazily so memory stays flat no matter how much a stage fans out. It is the substrate-free core under chain(), and the safe default for unbounded pipelines.
    • fun() — an async function from a list of functions; collects outputs per input (explicit import).
  • Adapters:
    • asStream() — wraps a function as a Node Duplex.
    • asWebStream() — wraps a function as a Web Streams pair, with per-item backpressure.
  • Helpers — slicing (take, skip, …), reduce (fold, scan, …), stream adapters (readableFrom, stream pullers), and more. See utils.
  • JSONL — streaming JSON Lines I/O for object streams: a substrate-free parser() / stringer(), Node and Web wrappers, fused local-file edges (parseFile() / stringerToFile()), and per-line error handling via errorIndicator. See jsonl.
  • Subpathsstream-chain / stream-chain/node (default Node streams), stream-chain/web (native Web Streams, browser-safe), stream-chain/core (substrate-free async iterables).

Full documentation is in the wiki — browse the index, or search it by name.

License

BSD-3-Clause

Release History

  • 4.2.2 Bugfix: eliminated file handle leak, cleaned up exports.
  • 4.2.1 Factory-bundled JSONL entries — stream-chain/node/jsonl & stream-chain/web/jsonl carry .asStream / .asWebStream. Bugfix: removed checkedParse(), mistakenly exposed in 4.2.0.
  • 4.2.0 JSONL file-edge components (perf): parseFile(), stringerToFile() + errorIndicator option. Bugfix: now /core chain passes strings as a single value.
  • 4.1.1 Performance: faster synchronous pipelines.
  • 4.1.0 Web Streams parity: new readableWebStreamFrom(), reduceWebStream(), parserWebStream(), stringerWebStream() + dataSource() + minor bugfix.
  • 4.0.2 fixUtf8Stream() now works on browsers.
  • 4.0.1 Minor bugfixes. No API changes.
  • 4.0.0 Major: moved to ESM. New subpath split: /node (default), /web (native Web Streams), /core (substrate-free). New asWebStream() adapter with per-item backpressure. See the Migration guide.
  • 3.6.3 TS inference updates: improvements and found bugs. Updated deps.
  • 3.6.2 Improved TS typings: ChainOutput<W, R> propagates R to events and methods (thx Scover). Updated deps.
  • 3.6.1 Technical release: updated deps.
  • 3.6.0 Performance: sync-first fun() (~2.5× faster for sync pipelines, now returns Many | Promise<Many>). Sync fast path in gen() (~1.6× faster). Documented null/undefined handling differences. Wiki: renamed V2 files for Windows compatibility.
  • 3.5.1 Fixed finish event not firing after stop. Web stream detection uses duck-typing instead of instanceof (supports non-standard web streams) (thx Alex Yang). Performance: unified fast path in asStream().
  • 3.5.0 Variadic combineMany() and combineManyMut(). Fixed readableFrom() unhandled rejection bug. Improved TS typings, docs, and package.json metadata.

The full release notes are in the wiki: Release notes.

About

Chain functions, generators, Node streams, and Web streams into a pipeline with backpressure support.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Sponsor this project

  •  

Contributors