Streaming outputs is useful for returning partial results to the client, before all data has been processed.

For example LLM text generation happens in incremental text chunks, so the beginning of the reply can already be sent to the client before the whole prediction is complete. Similarly, transcribing audio to text happens in ~30 second chunks and the first ones can be returned before all completed.

In general, this does not reduce the overall processing time (still the same amount of work must be done), but the initial latency to get some response can be reduced significantly.

In some cases it might even reduce overall time, when streaming results internally in a Chain, allows to start subsequent processing steps sooner - i.e. pipelining the operations in a more efficient way.

Low-level streaming

Low-level, streaming works by sending byte chunks (unicode strings will be implicitly encoded) via HTTP. The most primitive way of doing this in Chains is by implementing run_remote as a bytes- or string-iterator, e.g.:

from typing import AsyncIterator
import truss_chains as chains


class Streamlet(chains.ChainletBase):

    async def run_remote(self, inputs: ...) -> AsyncIterator[str]:
        async for text_chunk in make_incremental_outputs(inputs):
            yield text_chunk

You are free to chose what data to represent in the byte/string chunks, it could be raw text generated by an LLM, it could be JSON string, bytes or anything else.

Server-sent events (SSEs)

A possible choice is to generate chunks that comply with the specification of server-sent events.

Concretely, sending JSON strings with data, event and potentially other fields and content-type text/event-stream .

However, the SSE specification is not opinionated regarding what exactly is encoded in data and what event-types exist - you have to make up your schema that is useful for the client that consumes the data.

Pydantic and Chainlet-Chainlet-streams

While above low-level streaming is stable, the following helper APIs for typed streaming are only stable for intra-Chain streaming.

If you want to use them for end clients, please reach out to Baseten support, so we can discuss the stable solutions.

Unlike above β€œraw” stream example, Chains takes the general opinion that input and output types should be definite, so that divergence and type errors can be avoided.

Just like you type-annotate Chainlet inputs and outputs in the non-streaming case, and use pydantic to manage more complex data structures, we built tooling to bring the same benefits to streaming.

Headers and footers

This also helps to solve another challenge of streaming: you might want to sent data of different kinds at the beginning or end of a stream than in the β€œmain” part.

For example if you transcribe an audio file, you might want to send many transcription segments in a stream and at the end send some aggregate information such as duration, detected languages etc.

We model typed streaming like this:

  • [optionally] send a chunk that conforms to the schema of a Header pydantic model.
  • Send 0 to N chunks each conforming to the schema of an Item pydantic model.
  • [optionally] send a chunk that conforms to the schema of a Footer pydantic model.

APIs

StreamTypes

To have a single source of truth for the types that can be shared between the producing Chainlet and the consuming client (either a Chainlet in the Chain or an external client), the chains framework uses a StreamType-object:

import pydantic
from truss_chains import streaming


class MyDataChunk(pydantic.BaseModel):
    words: list[str]


STREAM_TYPES = streaming.stream_types(
    MyDataChunk, header_type=..., footer_type=...)

Note that header and footer types are optional and can be left out:

STREAM_TYPES = streaming.stream_types(MyDataChunk)

StreamWriter

Use the STREAM_TYPES to create a matching stream writer:

from typing import AsyncIterator
import pydantic
import truss_chains as chains
from truss_chains import streaming


class MyDataChunk(pydantic.BaseModel):
    words: list[str]


STREAM_TYPES = streaming.stream_types(MyDataChunk)


class Streamlet(chains.ChainletBase):

    async def run_remote(self, inputs: ...) -> AsyncIterator[bytes]:
        stream_writer = streaming.stream_writer(STREAM_TYPES)
        async for item in make_pydantic_items(inputs):
            yield stream_writer.yield_item(item)

If your stream types have header or footer types, corresponding yield_header and yield_footer methods are available on the writer.

The writer serializes the pydantic data to bytes, so you can also efficiently represent numeric data (see the binary IO guide).

StreamReader

To consume the stream on either another Chainlet or in the external client, an matching StreamReader is created form your StreamTypes. Besides the types, you connect the reader to the bytes generator that you obtain from the remote invocation of the streaming Chainlet:

import truss_chains as chains
from truss_chains import streaming


class Consumer(chains.ChainletBase):

    def __init__(self, streamlet=chains.depends(Streamlet)):
        self._streamlet = streamlet

    async def run_remote(self, data: ...):
        byte_stream = self._streamlet.run_remote(data)
        reader = streaming.stream_reader(STREAM_TYPES, byte_stream)
        chunks = []
        async for data in reader.read_items():
            chunks.append(data)

If you use headers or footers, the reader has async read_header and read_footer methods.

Note that the stream can only be consumed once and you have to consume header, items and footer in order.

The implementation of StreamReader only needs pydantic, no other Chains dependencies. So you can take that implementation code in isolation and integrated it in your client code.