Skip to main content

View on GitHub

Built in Rust and integrated with Python, Node.js, and native Rust, the Performance Client handles concurrent POST requests efficiently. It releases the Python GIL while executing requests, enabling simultaneous sync and async usage. Benchmarks show the Performance Client reaches 1200+ requests per second per client. Use it with Baseten deployments or third-party providers like OpenAI. benchmarks

Install the client

To install the Performance Client for Python, use pip:
pip install baseten_performance_client>=0.1.0

Get started

To initialize the Performance Client in Python, import the class and provide your base URL and API key:
from baseten_performance_client import PerformanceClient

client = PerformanceClient(
    base_url="https://model-YOUR_MODEL_ID.api.baseten.co/environments/production/sync",
    api_key="YOUR_API_KEY"
)
The client also works with third-party providers like OpenAI by replacing the base_url.

Advanced setup

Configure HTTP version selection and connection pooling for optimal performance.
To configure HTTP version and connection pooling in Python, use the http_version parameter and HttpClientWrapper:
from baseten_performance_client import PerformanceClient, HttpClientWrapper

# HTTP/1.1 (default, better for high concurrency)
client_http1 = PerformanceClient(BASE_URL, API_KEY, http_version=1)

# HTTP/2 (not recommended on Baseten)
client_http2 = PerformanceClient(BASE_URL, API_KEY, http_version=2)

# Connection pooling for multiple clients
wrapper = HttpClientWrapper(http_version=1)
client1 = PerformanceClient(base_url="https://api1.example.com", client_wrapper=wrapper)
client2 = PerformanceClient(base_url="https://api2.example.com", client_wrapper=wrapper)

Core features

Embeddings

The client provides efficient embedding requests with configurable batching, concurrency, and latency optimizations. Compatible with BEI.
To generate embeddings with Python, configure a RequestProcessingPreference and call client.embed():
from baseten_performance_client import PerformanceClient, RequestProcessingPreference

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)
texts = ["Hello world", "Example text", "Another sample"] * 10

preference = RequestProcessingPreference(
    batch_size=16,
    max_concurrent_requests=256,
    max_chars_per_request=10000,
    hedge_delay=0.5,
    timeout_s=360,
    total_timeout_s=600
)

response = client.embed(
    input=texts,
    model="my_model",
    preference=preference
)

print(f"Model used: {response.model}")
print(f"Total tokens used: {response.usage.total_tokens}")
print(f"Total time: {response.total_time:.4f}s")

# Convert to numpy array (requires numpy)
numpy_array = response.numpy()
print(f"Embeddings shape: {numpy_array.shape}")
For async usage, call await client.async_embed(input=texts, model="my_model", preference=preference).

Generic batch POST

Send HTTP requests to any URL with any JSON payload. Compatible with Engine-Builder-LLM and other models. Set stream=False for SSE endpoints.
To send batch POST requests with Python, define your payloads and call client.batch_post():
from baseten_performance_client import PerformanceClient, RequestProcessingPreference

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)

payloads = [
    {"model": "my_model", "prompt": "Batch request 1", "stream": False},
    {"model": "my_model", "prompt": "Batch request 2", "stream": False}
] * 10

preference = RequestProcessingPreference(
    max_concurrent_requests=96,
    timeout_s=720,
    hedge_delay=0.5
)

response = client.batch_post(
    url_path="/v1/completions",
    payloads=payloads,
    custom_headers={"x-custom-header": "value"},
    preference=preference,
    method="POST"
)

print(f"Total time: {response.total_time:.4f}s")
Supported methods: GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS.For async usage, call await client.async_batch_post(url_path, payloads, preference, custom_headers, method).

Reranking

Rerank documents by relevance to a query. Compatible with BEI, BEI-Bert, and text-embeddings-inference reranking endpoints.
To rerank documents with Python, provide a query and list of documents to client.rerank():
from baseten_performance_client import PerformanceClient, RequestProcessingPreference

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)

query = "What is the best framework?"
documents = ["Doc 1 text", "Doc 2 text", "Doc 3 text"]

preference = RequestProcessingPreference(
    batch_size=16,
    max_concurrent_requests=32,
    timeout_s=360,
    max_chars_per_request=256000,
    hedge_delay=0.5
)

response = client.rerank(
    query=query,
    texts=documents,
    model="rerank-model",
    return_text=True,
    preference=preference
)

for res in response.data:
    print(f"Index: {res.index} Score: {res.score}")
For async usage, call await client.async_rerank(query, texts, model, return_text, preference).

Classification

Classify text inputs into categories. Compatible with BEI and text-embeddings-inference classification endpoints.
To classify text with Python, provide a list of inputs to client.classify():
from baseten_performance_client import PerformanceClient, RequestProcessingPreference

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)

texts_to_classify = [
    "This is great!",
    "I did not like it.",
    "Neutral experience."
]

preference = RequestProcessingPreference(
    batch_size=16,
    max_concurrent_requests=32,
    timeout_s=360.0,
    max_chars_per_request=256000,
    hedge_delay=0.5
)

response = client.classify(
    inputs=texts_to_classify,
    model="classification-model",
    preference=preference
)

for group in response.data:
    for result in group:
        print(f"Label: {result.label}, Score: {result.score}")
For async usage, call await client.async_classify(inputs, model, preference).

Advanced features

Configure RequestProcessingPreference

The RequestProcessingPreference class provides unified configuration for all request processing parameters.
To configure request processing in Python, create a RequestProcessingPreference instance:
from baseten_performance_client import RequestProcessingPreference

preference = RequestProcessingPreference(
    max_concurrent_requests=64,
    batch_size=32,
    timeout_s=30.0,
    hedge_delay=0.5,
    hedge_budget_pct=0.15,
    retry_budget_pct=0.08,
    total_timeout_s=300.0
)

Parameter reference

ParameterTypeDefaultRangeDescription
max_concurrent_requestsint1281-1024Maximum parallel requests
batch_sizeint1281-1024Items per batch
timeout_sfloat3600.01.0-7200.0Per-request timeout in seconds
hedge_delayfloatNone0.2-30.0Hedge delay in seconds (see below)
hedge_budget_pctfloat0.100.0-3.0Percentage of requests allowed for hedging
retry_budget_pctfloat0.050.0-3.0Percentage of requests allowed for retries
total_timeout_sfloatNone≥timeout_sTotal operation timeout
Hedge delay sends duplicate requests after a specified delay to reduce p99 latency. After the delay, the request is cloned and raced against the original. The 429 and 5xx errors are always retried automatically.

Select HTTP version

Choose between HTTP/1.1 and HTTP/2 for optimal performance. HTTP/1.1 is recommended for high concurrency workloads.
To select the HTTP version in Python, use the http_version parameter:
from baseten_performance_client import PerformanceClient

# HTTP/1.1 (default, better for high concurrency)
client_http1 = PerformanceClient(BASE_URL, API_KEY, http_version=1)

# HTTP/2 (better for single requests)
client_http2 = PerformanceClient(BASE_URL, API_KEY, http_version=2)

Share connection pools

Share connection pools across multiple client instances to reduce overhead when connecting to multiple endpoints.
To share a connection pool in Python, create an HttpClientWrapper and pass it to each client:
from baseten_performance_client import PerformanceClient, HttpClientWrapper

wrapper = HttpClientWrapper(http_version=1)

client1 = PerformanceClient(base_url="https://api1.example.com", client_wrapper=wrapper)
client2 = PerformanceClient(base_url="https://api2.example.com", client_wrapper=wrapper)

Cancel operations

Cancel long-running operations using CancellationToken. The token provides immediate cancellation, resource cleanup, Ctrl+C support, token sharing across operations, and status checking with is_cancelled().
To cancel operations in Python, create a CancellationToken and pass it to your preference:
from baseten_performance_client import (
    PerformanceClient,
    CancellationToken,
    RequestProcessingPreference
)
import threading
import time

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)

cancel_token = CancellationToken()
preference = RequestProcessingPreference(
    max_concurrent_requests=32,
    batch_size=16,
    timeout_s=360.0,
    cancel_token=cancel_token
)

def long_operation():
    try:
        response = client.embed(
            input=["large batch"] * 1000,
            model="embedding-model",
            preference=preference
        )
        print("Operation completed")
    except ValueError as e:
        if "cancelled" in str(e):
            print("Operation was cancelled")

threading.Thread(target=long_operation).start()
time.sleep(2)
cancel_token.cancel()

Handle errors

The client raises standard exceptions for error conditions:
  • HTTPError: Authentication failures (403), server errors (5xx), endpoint not found (404).
  • Timeout: Request or total operation timeout based on timeout_s or total_timeout_s.
  • ValueError: Invalid input parameters (empty input list, invalid batch size, inconsistent embedding dimensions).
To handle errors in Python, catch the appropriate exception types:
import requests
from baseten_performance_client import PerformanceClient, RequestProcessingPreference

client = PerformanceClient(base_url=BASE_URL, api_key=API_KEY)
preference = RequestProcessingPreference(timeout_s=30.0)

try:
    response = client.embed(input=["text"], model="model", preference=preference)
    print(f"Model used: {response.model}")
except requests.exceptions.HTTPError as e:
    print(f"HTTP error: {e}, status code: {e.response.status_code}")
except requests.exceptions.Timeout as e:
    print(f"Timeout error: {e}")
except ValueError as e:
    print(f"Input error: {e}")

Configure the client

Environment variables

  • BASETEN_API_KEY: Your Baseten API key. Also checks OPENAI_API_KEY as fallback.
  • PERFORMANCE_CLIENT_LOG_LEVEL: Logging level. Overrides RUST_LOG. Valid values: trace, debug, info, warn, error. Default: warn.
  • PERFORMANCE_CLIENT_REQUEST_ID_PREFIX: Custom prefix for request IDs. Default: perfclient.

Configure logging

To set the logging level, use the PERFORMANCE_CLIENT_LOG_LEVEL environment variable:
PERFORMANCE_CLIENT_LOG_LEVEL=info python script.py
PERFORMANCE_CLIENT_LOG_LEVEL=debug cargo run
The PERFORMANCE_CLIENT_LOG_LEVEL variable takes precedence over RUST_LOG.

Use with Rust

The Performance Client is also available as a native Rust library. To use the Performance Client in Rust, add the dependencies and create a PerformanceClientCore instance:
use baseten_performance_client_core::{PerformanceClientCore, ClientError};
use tokio;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let api_key = std::env::var("BASETEN_API_KEY").expect("BASETEN_API_KEY not set");
    let base_url = "https://model-YOUR_MODEL_ID.api.baseten.co/environments/production/sync";

    let client = PerformanceClientCore::new(base_url, Some(api_key), None, None);

    // Generate embeddings
    let texts = vec!["Hello world".to_string(), "Example text".to_string()];
    let embedding_response = client.embed(
        texts,
        "my_model".to_string(),
        Some(16),
        Some(32),
        Some(360.0),
        Some(256000),
        Some(0.5),
        Some(360.0),
    ).await?;

    println!("Model: {}", embedding_response.model);
    println!("Total tokens: {}", embedding_response.usage.total_tokens);

    // Send batch POST requests
    let payloads = vec![
        serde_json::json!({"model": "my_model", "input": ["Rust sample 1"]}),
        serde_json::json!({"model": "my_model", "input": ["Rust sample 2"]}),
    ];

    let batch_response = client.batch_post(
        "/v1/embeddings".to_string(),
        payloads,
        Some(32),
        Some(360.0),
        Some(0.5),
        Some(360.0),
        None,
    ).await?;

    println!("Batch POST total time: {:.4}s", batch_response.total_time);

    Ok(())
}
Add these dependencies to your Cargo.toml:
[dependencies]
baseten_performance_client_core = "0.1.0"
tokio = { version = "1.0", features = ["full"] }
serde_json = "1.0"

Further reading