> ## Documentation Index
> Fetch the complete documentation index at: https://docs.baseten.co/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming

> Streaming outputs, reducing latency, SSEs

Streaming outputs is useful for returning partial results to the client, before
all data has been processed.

For example LLM text generation happens in incremental text chunks, so the
beginning of the reply can already be sent to the client before the whole
prediction is complete.
Similarly, transcribing audio to text happens in \~30 second chunks and the
first ones can be returned before all completed.

In general, this does not reduce the overall processing time (still the same
amount of work must be done), but the initial latency to get some response
can be reduced significantly.

In some cases it might even reduce overall time, when streaming results
internally in a Chain, allows to start subsequent processing steps sooner -
that is, pipelining the operations in a more efficient way.

# Low-level streaming

Low-level, streaming works by sending byte chunks (unicode strings will be
implicitly encoded) via HTTP. The most primitive way of doing this in Chains
is by implementing `run_remote` as a bytes- or string-iterator, for example:

```python theme={"system"}
from typing import AsyncIterator
import truss_chains as chains


class Streamlet(chains.ChainletBase):

    async def run_remote(self, inputs: ...) -> AsyncIterator[str]:
        async for text_chunk in make_incremental_outputs(inputs):
            yield text_chunk
```

You are free to chose what data to represent in the byte/string chunks, it
could be raw text generated by an LLM, it could be JSON string, bytes or
anything else.

# Server-sent events (SSEs)

A possible choice is to generate chunks that comply with the
[specification](https://html.spec.whatwg.org/multipage/server-sent-events.html)
of server-sent events.

Concretely, sending JSON strings with `data`, `event` and potentially
other fields and content-type `text/event-stream` .

However, the SSE specification is not opinionated regarding what exactly is
encoded in `data` and what `event`-types exist. You have to make up your schema
that is useful for the client that consumes the data.

# Pydantic and Chainlet-Chainlet-streams

<Info>
  While above low-level streaming is stable, the following helper APIs for typed
  streaming are only stable for intra-Chain streaming.

  If you want to use them for end clients, please reach out to Baseten support,
  so we can discuss the stable solutions.
</Info>

Unlike above "raw" stream example, Chains takes the general opinion that
input and output types should be definite, so that divergence and type
errors can be avoided.

Just like you type-annotate Chainlet inputs and outputs in the non-streaming
case, and use pydantic to manage more complex data structures, we built
tooling to bring the same benefits to streaming.

## Headers and footers

This also helps to solve another challenge of streaming: you might want to
send data of different kinds at the beginning or end of a stream than in
the "main" part.

For example if you transcribe an audio file, you might want
to send many transcription segments in a stream and at the end send some
aggregate information such as duration, detected languages etc.

We model typed streaming like this:

* \[optionally] send a chunk that conforms to the schema of a `Header` pydantic
  model.
* Send 0 to N chunks each conforming to the schema of an `Item` pydantic
  model.
* \[optionally] send a chunk that conforms to the schema of a `Footer` pydantic
  model.

## APIs

### StreamTypes

To have a single source of truth for the types that can be shared between
the producing Chainlet and the consuming client (either a Chainlet in the
Chain or an external client), the chains framework uses a `StreamType`-object:

```python theme={"system"}
import pydantic
from truss_chains import streaming


class MyDataChunk(pydantic.BaseModel):
    words: list[str]


STREAM_TYPES = streaming.stream_types(
    MyDataChunk, header_type=..., footer_type=...)
```

Note that header and footer types are optional and can be left out:

```python theme={"system"}
STREAM_TYPES = streaming.stream_types(MyDataChunk)
```

### StreamWriter

Use the `STREAM_TYPES` to create a matching stream writer:

```python theme={"system"}
from typing import AsyncIterator
import pydantic
import truss_chains as chains
from truss_chains import streaming


class MyDataChunk(pydantic.BaseModel):
    words: list[str]


STREAM_TYPES = streaming.stream_types(MyDataChunk)


class Streamlet(chains.ChainletBase):

    async def run_remote(self, inputs: ...) -> AsyncIterator[bytes]:
        stream_writer = streaming.stream_writer(STREAM_TYPES)
        async for item in make_pydantic_items(inputs):
            yield stream_writer.yield_item(item)
```

If your stream types have header or footer types, corresponding
`yield_header` and `yield_footer` methods are available on the writer.

The writer serializes the pydantic data to `bytes`, so you can also
efficiently represent numeric data (see the
[binary IO guide](/development/chain/binaryio)).

### StreamReader

To consume the stream on either another Chainlet or in the external client, a
matching `StreamReader` is created form your `StreamTypes`. Besides the
types, you connect the reader to the bytes generator that you obtain from the
remote invocation of the streaming Chainlet:

```python theme={"system"}
import truss_chains as chains
from truss_chains import streaming


class Consumer(chains.ChainletBase):

    def __init__(self, streamlet=chains.depends(Streamlet)):
        self._streamlet = streamlet

    async def run_remote(self, data: ...):
        byte_stream = self._streamlet.run_remote(data)
        reader = streaming.stream_reader(STREAM_TYPES, byte_stream)
        chunks = []
        async for data in reader.read_items():
            chunks.append(data)
```

If you use headers or footers, the reader has async `read_header` and
`read_footer` methods.

<Info>
  Note that the stream can only be consumed once and you have to consume
  header, items and footer in order.
</Info>

<Tip>
  The implementation of `StreamReader` only needs `pydantic`, no other Chains
  dependencies. So you can take that implementation code in isolation and
  integrate it in your client code.
</Tip>
