import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextIteratorStreamer
from typing import Dict
from threading import Thread

CHECKPOINT = "tiiuae/falcon-7b-instruct"
DEFAULT_MAX_NEW_TOKENS = 150
DEFAULT_TOP_P = 0.95


class Model:
    def __init__(self, **kwargs) -> None:
        self.tokenizer = None
        self.model = None

    def load(self):
        self.tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
        self.tokenizer.pad_token = self.tokenizer.eos_token_id
        self.model = AutoModelForCausalLM.from_pretrained(
            CHECKPOINT,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            device_map="auto",
        )

    def predict(self, request: Dict) -> Dict:
        prompt = request.pop("prompt")
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            max_length=512,
            truncation=True,
            padding=True
        )
        input_ids = inputs["input_ids"].to("cuda")
        streamer = TextIteratorStreamer(self.tokenizer)
        generation_config = GenerationConfig(
            temperature=1,
            top_p=DEFAULT_TOP_P,
            top_k=40,
        )

        with torch.no_grad():
            generation_kwargs = {
                "input_ids": input_ids,
                "generation_config": generation_config,
                "return_dict_in_generate": True,
                "output_scores": True,
                "pad_token_id": self.tokenizer.eos_token_id,
                "max_new_tokens": DEFAULT_MAX_NEW_TOKENS,
                "streamer": streamer
            }

            # Kick off a new thread to execute the model generation.
            # As the model generates outputs, they will be readable
            # from the Streamer object.
            thread = Thread(
                target=self.model.generate,
                kwargs=generation_kwargs
            )
            thread.start()

            # We return a generator that iterates over content in the
            # streamer object.
            def inner():
                for text in streamer:
                    yield text
                thread.join()

            return inner()

The worst part of using generative AI tools is the long wait time during model inference. For some types of generative models, including large language models (LLMs), you can start getting results 10X faster by streaming model output as it is generated.

LLMs have two properties that make streaming output particularly useful:

  1. Generating a complete response takes time, easily 10 seconds or more for longer outputs
  2. Partial outputs are often useful!

When you host your LLMs with Baseten, you can stream responses. Instead of having to wait for the entire output to be generated, you can immediately start returning results to users with a sub-one-second time-to-first-token.

In this example, we will show you how to deploy Falcon 7B, an LLM, and stream the output as it is generated.

You can see the code for the finished Falcon 7B Truss on the right. Keep reading for step-by-step instructions on how to build it.

Step 0: Initialize Truss

Get started by creating a new Truss:

truss init falcon-7b

Give your model a name when prompted, like falcon-streaming. Then, navigate to the newly created directory:

cd falcon-7b

Step 1: Set up the Model class without streaming

As mentioned before, Falcon 7B is an LLM. We will use the Huggingface Transformers library to load and run the model. In this first step, we will generate output normally and return it without streaming the output.

In model/model.py, we write the class Model with three member functions:

  • __init__, which creates an instance of the object with a _model property
  • load, which runs once when the model server is spun up and loads the pipeline model
  • predict, which runs each time the model is invoked and handles the inference. It can use any JSON-serializable type as input and output for non-streaming outputs.

Read the quickstart guide for more details on Model class implementation.

model/model.py
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextIteratorStreamer
from typing import Dict
from threading import Thread

CHECKPOINT = "tiiuae/falcon-7b-instruct"
DEFAULT_MAX_NEW_TOKENS = 150
DEFAULT_TOP_P = 0.95


class Model:
    def __init__(self, **kwargs) -> None:
        self.tokenizer = None
        self.model = None

    def load(self):
        self.tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
        self.tokenizer.pad_token = self.tokenizer.eos_token_id
        self.model = AutoModelForCausalLM.from_pretrained(
            CHECKPOINT,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            device_map="auto",
        )

    def predict(self, request: Dict) -> Dict:
        prompt = request.pop("prompt")
        # The steps in producing an output are to:
        #   1. Tokenize the input
        #   2. Set up generation parameters
        #   3. Call the model.generate function
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            max_length=512,
            truncation=True,
            padding=True
        )
        input_ids = inputs["input_ids"].to("cuda")
        # These generation parameters can be tuned
        # to better produce the output that you are looking for.
        generation_config = GenerationConfig(
            temperature=1,
            top_p=DEFAULT_TOP_P,
            top_k=40,
        )

        with torch.no_grad():
            generation_kwargs = {
                "input_ids": input_ids,
                "generation_config": generation_config,
                "return_dict_in_generate": True,
                "output_scores": True,
                "pad_token_id": self.tokenizer.eos_token_id,
                "max_new_tokens": DEFAULT_MAX_NEW_TOKENS,
            }
            return self.model.generate(
                **generation_kwargs
            )

Step 2: Add streaming support

Once we have a model that can produce the LLM outputs using the HuggingFace transformers library, we can adapt it to support streaming. The key change that needs to happen here is in the predict function.

While in the above example, the predict function returns a Dict containing the model output, to stream results, we need to return a Python Generator from the predict function instead. This will allow us to return partial results to the user as they are generated.

To produce outputs incrementally for the LLM, we will pass a TextIteratorStreamer object to the generate function. This object will return the model output as it is generated. We will then kick off the generation on a separate thread.

What we return from the predict function is a generator that will yield the model output from the streamer object as it is generated.

model/model.py
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextIteratorStreamer
from typing import Dict
from threading import Thread

CHECKPOINT = "tiiuae/falcon-7b-instruct"
DEFAULT_MAX_NEW_TOKENS = 150
DEFAULT_TOP_P = 0.95


class Model:
    def __init__(self, **kwargs) -> None:
        self.tokenizer = None
        self.model = None

    def load(self):
        self.tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
        self.tokenizer.pad_token = self.tokenizer.eos_token_id
        self.model = AutoModelForCausalLM.from_pretrained(
            CHECKPOINT,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            device_map="auto",
        )

    def predict(self, request: Dict) -> Dict:
        prompt = request.pop("prompt")
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            max_length=512,
            truncation=True,
            padding=True
        )
        input_ids = inputs["input_ids"].to("cuda")
        streamer = TextIteratorStreamer(self.tokenizer)
        generation_config = GenerationConfig(
            temperature=1,
            top_p=DEFAULT_TOP_P,
            top_k=40,
        )

        with torch.no_grad():
            generation_kwargs = {
                "input_ids": input_ids,
                "generation_config": generation_config,
                "return_dict_in_generate": True,
                "output_scores": True,
                "pad_token_id": self.tokenizer.eos_token_id,
                "max_new_tokens": DEFAULT_MAX_NEW_TOKENS,
                "streamer": streamer
            }

            thread = Thread(
                target=self.model.generate,
                kwargs=generation_kwargs
            )
            thread.start()

            def inner():
                for text in streamer:
                    yield text
                thread.join()

            return inner()

Step 3: Add remainder of Truss configuration

Once we have the model code written β€” the next thing we need to do before we deploy is make sure that we have the rest of the Truss configuration in place.

The only things we need to add to the config.yaml are the Python and hardware requirements for the model.

config.yaml
model_name: falcon-streaming
requirements:
- torch==2.0.1
- peft==0.4.0
- scipy==1.11.1
- sentencepiece==0.1.99
- accelerate==0.21.0
- bitsandbytes==0.41.1
- einops==0.6.1
- transformers==4.31.0
resources:
  cpu: "3"
  memory: 14Gi
  use_gpu: true
  accelerator: A10G

Step 4: Deploy the model

You’ll need a Baseten API key for this step.

We have successfully packaged Falcon as a Truss. Let’s deploy! Run:

truss push

Step 5: Invoke the model

You can invoke the model with:

truss predict -d '{"prompt": "Tell me about falcons", "do_sample": true}'
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextIteratorStreamer
from typing import Dict
from threading import Thread

CHECKPOINT = "tiiuae/falcon-7b-instruct"
DEFAULT_MAX_NEW_TOKENS = 150
DEFAULT_TOP_P = 0.95


class Model:
    def __init__(self, **kwargs) -> None:
        self.tokenizer = None
        self.model = None

    def load(self):
        self.tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
        self.tokenizer.pad_token = self.tokenizer.eos_token_id
        self.model = AutoModelForCausalLM.from_pretrained(
            CHECKPOINT,
            torch_dtype=torch.bfloat16,
            trust_remote_code=True,
            device_map="auto",
        )

    def predict(self, request: Dict) -> Dict:
        prompt = request.pop("prompt")
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            max_length=512,
            truncation=True,
            padding=True
        )
        input_ids = inputs["input_ids"].to("cuda")
        streamer = TextIteratorStreamer(self.tokenizer)
        generation_config = GenerationConfig(
            temperature=1,
            top_p=DEFAULT_TOP_P,
            top_k=40,
        )

        with torch.no_grad():
            generation_kwargs = {
                "input_ids": input_ids,
                "generation_config": generation_config,
                "return_dict_in_generate": True,
                "output_scores": True,
                "pad_token_id": self.tokenizer.eos_token_id,
                "max_new_tokens": DEFAULT_MAX_NEW_TOKENS,
                "streamer": streamer
            }

            # Kick off a new thread to execute the model generation.
            # As the model generates outputs, they will be readable
            # from the Streamer object.
            thread = Thread(
                target=self.model.generate,
                kwargs=generation_kwargs
            )
            thread.start()

            # We return a generator that iterates over content in the
            # streamer object.
            def inner():
                for text in streamer:
                    yield text
                thread.join()

            return inner()