Learn more about Chains.

Prerequisites

To use Chains, install a recent Truss version and ensure pydantic is v2:

pip install --upgrade truss 'pydantic>=2.0.0'

To deploy Chains remotely, you also need a Baseten account. It is handy to export your API key to the current shell session or permanently in your .bashrc:

~/.bashrc
export BASETEN_API_KEY="nPh8..."

Overview

This example shows how to transcribe audio media files to text blazingly fast and at high quality using a Chain. To achieve this we will combine a number of methods:

  • Partitioning large input files (10h+) into smaller chunks.
  • Analyzing the audio for silence to find optimal split points of the chunks.
  • Distributing the chunks tasks across auto-scaling Baseten deployments.
  • Using batching with a highly optimized transcription model to maximize GPU utilization.
  • Range downloads and pipelining of audio extraction to minimize latency.
  • asyncio for concurrent execution of tasks.

The implementation is quite a bit of code, located in the Chains examples repo. This guide is a commentary on the code, pointing out critical parts or explaining design choices.

If you want to try out this Chain and create a customized version of it, check out the try it yourself section below.

The Chain structure

The chunking has a 2-step hierarchy:

1

macro chunks

“macro chunks” partition the full media into segments of in the range of ~300s. This ensures that for very long files, the workload of a single MacroChunkWorker is limited by that duration and the source data for the different macro chunks is downloaded in parallel, making processing very long files much faster. For shorter inputs, there will be only a single “macro chunk”.

2

micro chunks

“micro chunks” have durations in the range of 5-30s. These are sent to the transcription model.

More details in the explanations of the Chainlets below.

The WhisperModel is split off the transcription Chain. This is optional, but has some advantages:

  • A lot of “business logic”, which might more frequently be changed, is implemented in the Chain. When developing or changing the Chain and making frequent re-deployments, it’s a faster dev loop to not re-deploy the Whisper model, since as a large GPU model with heavy dependencies, this is slower.
  • The Whisper model can be used in other Chains, or standalone, if it’s not part of this Chain. Specifically the same model can be used by dev and prod version of a Chain - otherwise a separate Whisper model would need to be deployed with each environment.
  • When making changes and improvements to the Whisper model, the development can be split of the development of the Chain - think of a separation of concerns into high-level (the Chain) and low-level (the model) development.

More information on how to use and deploy non-Chain models within a Chain is given in the WhisperModel section below.

Transcribe

This Chainlet is the “entrypoint” to the Chain, external client send transcription requests to it. It’s endpoint implementation has the following signature:

async def run_remote(
  self,
  media_url: str,
  params: data_types.TranscribeParams
) -> data_types.TranscribeOutput:

The input arguments are separated into media_url, the audio source to work on, and params that control the execution, e.g. the chunk sizes. You can find the exact schemas and docstrings of these arguments in data_types.py. An example request looks like this:

curl -X POST $INVOCATION_URL \
    -H "Authorization: Api-Key $BASETEN_API_KEY" \
    -d '<JSON_INPUT>'

with JSON input:

{
  "media_url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/TearsOfSteel.mp4",
  "params": {
    "micro_chunk_size_sec": 30,
    "macro_chunk_size_sec": 300
  }
}

The output looks like this (truncated):

{
    "segments": [
        ...
        {
            "start_time_sec": 517.9465,
            "end_time_sec": 547.70975,
            "text": "The world's changed, Celia. Maybe we can too. Memory override complete!",
            "language": "english",
            "bcp47_key": "en"
        },
        {
            "start_time_sec": 547.70975,
            "end_time_sec": 567.0716874999999,
            "text": "You know, there's a lesson to be learned from this. Could've gone worse.",
            "language": "english",
            "bcp47_key": "en"
        },
        ...
    ],
    "input_duration_sec": 734.261406,
    "processing_duration_sec": 82.42135119438171,
    "speedup": 8.908631020478238
}

The Transcribe Chainlet does the following:

  • Assert that the media URL supports range downloads. This is usually a given for video / audio hosting services.
  • Uses FFMPEG to query the length of the medium (both video and audio files are supported).
  • Generates a list of “macro chunks”, defined by their start and end times. The length is defined by macro_chunk_size_sec in TranscribeParams. This will soon be upgraded to find silence aware split points, so that a chunk does not end in the middle of a spoken word. To do this a small segment around the desired chunk boundary is downloaded (e.g. +/- 5 seconds) and the most silent timestamp within is determined.
  • Sends the media URL with chunk limits as “tasks” to MacroChunkWorker. Using asyncio.ensure_future, these tasks are dispatched concurrently - meaning that the loop over the chunks does not wait for each chunk to complete first, before dispatched the task on the next chunk. These “calls” are network requests (RPCs) to the MacroChunkWorker Chainlet which runs on its own deployment and can auto-scale, depending on the load.
  • Once all tasks are dispatched, it waits for the results and concatenates all the partial transcriptions from the chunks to a final output.

MacroChunkWorker

The MacroChunkWorker Chainlet works on chunk tasks it receives from the Transcribe Chainlet. For each chunk it does the following:

  • It starts a DownloadSubprocess asynchronously (i.e. this will need CPU on the machine, but not block the event loop of the main process, making it possible to serve multiple concurrent requests).
  • In DownloadSubprocess, FFMPEG is used to download the relevant time range from the source. It extracts the audio wave form and streams the raw wave bytes back to the main process. This happens on-the-fly (i.e. not waiting for the full download to complete) - so the initial latency until receiving wave bytes is minimized. Furthermore, it resamples the wave form to the sampling rate expected by the transcription model and averages multichannel audio to a mono signal.
  • The main process reads as many bytes from the wave stream as needed for micro_chunk_size_sec (5-30s).
  • A helper function _find_silent_split_point analyzes the wave form to find the most silent point in the second half of the chunk. E.g. if the micro_chunk_size_sec is 5s, then it searches the most silent point between 2.5 and 5.0s and uses this time to partition the chunk.
  • The wave bytes are converted to wave file format (i.e. including metadata in the header) and then b64-encoded, so they can be sent as JSON via HTTP.
  • For each b64-encoded “micro” chunk, the transcription model is invoked.
  • Like in the Transcribe Chainlet, these tasks are concurrent RPCs, the transcription model deployment can auto-scale with the load.
  • Finally, we wait for all “micro chunk” results, concatenate them to a
  • “macro chunk” result and return it to Transcribe.

WhisperModel

As mentioned in the structure section, the WhisperModel is separately deployed from the transcription Chain.

In the Chain implementation we only need to define a small “adapter” class WhisperModel, mainly for integrating the I/O types of that model with our Chain. This is a subclass of chains. StubBase which abstracts sending requests, retries etc. away from us (this class is also used for all RPCs that the Chains framework makes internally). Furthermore, we need to take the invocation URL of that model (e.g. https://model-5woz91z3.api.baseten.co/production/predict) and pass it along when initializing this adapter class with the from_url factory-method.

There are two options for deploying a model separately from a Chain:

As a Chainlet

This is done in this example.

As a Chainlet it can even be in the same file, but not “wired” into the Chain with the chains.depends-directive. In this example we put it into a separate file whisper_chainlet.py.

  • It will not be included in the deployment when running the truss chains deploy transcribe.py command for the entrypoint, since it’s not formally a tracked dependency of that Chain.
  • It is separately deployed, with a deploy command specifically targeting that class i.e. truss chains push whisper_chainlet.py.

Using a structure like this, has the advantage of benefiting from high code-coherence, e.g. the pydantic models for the input and output are shared in both files (defined in the common data_types.py), while still allowing independent deployment cycles.

As a conventional Truss model

This is not done in this example.

This could be anything, from the model library, the Truss examples repo or your own Truss model.

This might be the better choice, if the model has a substantial code base itself and if you want to avoid mixing that (and the development of it) with the Chain code.

Performance considerations

Even for very large files, e.g. 10h+, the end-to-end runtime is still bounded: since the macro_chunk_size_sec is fixed, each sub-task has a bounded runtime. So provided all Chainlet components have enough resources to auto-scale horizontally and the network bandwidth of the source hosting is sufficient, the overall runtime is still relatively small. Note that auto-scaling, e.g. the transcription model, to a large number of replicas can take a while, so you’ll only see the full speedup after a “warm-up” phase.

Depending on distribution of your input durations and the “spikiness” of your traffic there are a few knobs to tweak:

  • micro_chunk_size_sec: using too small “micro” chunks creates more overhead and leaves GPU underutilized, using too large ones, they processing of a single chunk might take too long or overflow the GPU model — the sweet spot is in the middle.
  • macro_chunk_size_sec: larger chunks mean less overhead, but also less download parallelism.
  • Predict-concurrency and autoscaling settings of all deployed components. Specifically make sure that the WhisperModel can scale up to enough replicas (but should also not be underutilized). Look at the GPU and CPU utilization metrics of the deployments.

Try it yourself

If you want to try this yourself follow the steps below:

All code can be found and copied in this example directory.

  • Download the example code.
  • Deploy the Whisper Chainlet first: truss chains push whisper_chainlet.py.
  • Note the invocation URL of the form https://chain-<CHAIN_ID>.api.baseten.co/production/run_remote and insert that URL as a value for WHISPER_URL in transcribe.py. You can find the URL in the output of the push command or on the status page.
  • Deploy the transcription Chain with truss chains push transcribe.py.

As media source URL, you can pass both video or audio sources, as long as the format can be handled by FFMPEG and the hosted file supports range downloads. A public test file you can use is shown in the example below.

curl -X POST $INVOCATION_URL \
    -H "Authorization: Api-Key $BASETEN_API_KEY" \
    -d '<JSON_INPUT>'

with JSON input:

{
  "media_url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/TearsOfSteel.mp4",
  "params": {
    "micro_chunk_size_sec": 30,
    "macro_chunk_size_sec": 300
  }
}