# Any deployment by ID post /v1/models/{model_id}/deployments/{deployment_id}/activate Activates an inactive deployment and returns the activation status. # 🆕 Activate environment deployment post /v1/models/{model_id}/environments/{env_name}/activate Activates an inactive deployment associated with an environment and returns the activation status. # Development deployment post /v1/models/{model_id}/deployments/development/activate Activates an inactive development deployment and returns the activation status. # Production deployment post /v1/models/{model_id}/deployments/production/activate Activates an inactive production deployment and returns the activation status. # Cancel async request DELETE https://model-{model_id}.api.baseten.co/async_request/{request_id} Use this endpoint to cancel a queued async request. Only `QUEUED` requests may be canceled. ### Parameters The ID of the model that executed the request. The ID of the async request. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the async request. Whether the request was canceled. Additional details about whether the request was canceled. ### Rate limits Calls to the cancel async request status endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. ```py Python import requests import os model_id = "" request_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.delete( f"https://model-{model_id}.api.baseten.co/async_request/{request_id}", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request DELETE \ --url https://model-{model_id}.api.baseten.co/async_request/{request_id} \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/async_request/{request_id}', { method: 'DELETE', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # Create a model environment post /v1/models/{model_id}/environments Creates an environment for the specified model and returns the environment. # Any deployment by ID post /v1/models/{model_id}/deployments/{deployment_id}/deactivate Deactivates a deployment and returns the deactivation status. # 🆕 Deactivate environment deployment post /v1/models/{model_id}/environments/{env_name}/deactivate Deactivates a deployment associated with an environment and returns the deactivation status. # Development deployment post /v1/models/{model_id}/deployments/development/deactivate Deactivates a development deployment and returns the deactivation status. # Production deployment post /v1/models/{model_id}/deployments/production/deactivate Deactivates a production deployment and returns the deactivation status. # Published deployment POST https://model-{model_id}.api.baseten.co/deployment/{deployment-id}/async_predict Use this endpoint to call any [published deployment](/deploy/lifecycle) of your model. ### Parameters The ID of the model you want to call. The ID of the specific deployment you want to call. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body There is a 256 KiB size limit to `/async_predict` request payloads. JSON-serializable model input. Baseten **does not** store model outputs. If `webhook_endpoint` is empty, your model must save prediction outputs so they can be accessed later. URL of the webhook endpoint. We require that webhook endpoints use HTTPS. Priority of the request. A lower value corresponds to a higher priority (e.g. requests with priority 0 are scheduled before requests of priority 1). `priority` is between 0 and 2, inclusive. Maximum time a request will spend in the queue before expiring. `max_time_in_queue_seconds` must be between 10 seconds and 72 hours, inclusive. Exponential backoff parameters used to retry the model predict request. Number of predict request attempts. `max_attempts` must be between 1 and 10, inclusive. Minimum time between retries in milliseconds. `initial_delay_ms` must be between 0 and 10,000 milliseconds, inclusive. Maximum time between retries in milliseconds. `max_delay_ms` must be between 0 and 60,000 milliseconds, inclusive. ### Response The ID of the async request. ### Rate limits Two types of rate limits apply when making async requests: * Calls to the `/async_predict` endpoint are limited to **200 requests per second**. * Each organization is limited to **50,000 `QUEUED` or `IN_PROGRESS` async requests**, summed across all deployments. If either limit is exceeded, subsequent `/async_predict` requests will receive a 429 status code. To avoid hitting these rate limits, we advise: * Implementing a backpressure mechanism, such as calling `/async_predict` with exponential backoff in response to 429 errors. * Monitoring the [async queue size metric](/observability/metrics#async-queue-size). If your model is accumulating a backlog of requests, consider increasing the number of requests your model can process at once by increasing the number of max replicas or the concurrency target in your autoscaling settings. ```py Python import requests import os model_id = "" deployment_id = "" webhook_endpoint = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.post( f"https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": webhook_endpoint # Optional fields for priority, max_time_in_queue_seconds, etc }, ) print(resp.json()) ``` ```sh cURL curl --request POST \ --url https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_predict \ --header "Authorization: Api-Key $BASETEN_API_KEY" \ --data '{ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook", "priority": 1, "max_time_in_queue_seconds": 100, "inference_retry_config": { "max_attempts": 3, "initial_delay_ms": 1000, "max_delay_ms": 5000 } }' ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook", "priority": 1, "max_time_in_queue_seconds": 100, "inference_retry_config": { "max_attempts": 3, "initial_delay_ms": 1000, "max_delay_ms": 5000 } }), } ); const data = await resp.json(); console.log(data); ``` ```json 201 { "request_id": "" } ``` # Published deployment GET https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_queue_status Use this endpoint to get the status of a published deployment's async queue. ### Parameters The ID of the model. The ID of the deployment. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the model. The ID of the deployment. The number of requests in the deployment's async queue with `QUEUED` status (i.e. awaiting processing by the model). The number of requests in the deployment's async queue with `IN_PROGRESS` status (i.e. currently being processed by the model). ```json 200 { "model_id": "", "deployment_id": "", "num_queued_requests": 12, "num_in_progress_requests": 3 } ``` ### Rate limits Calls to the `/async_queue_status` endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. To gracefully handle hitting this rate limit, we advise implementing a backpressure mechanism, such as calling `/async_queue_status` with exponential backoff in response to 429 errors. ```py Python import requests import os model_id = "" deployment_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.get( f"https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_queue_status", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request GET \ --url https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_queue_status \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/async_queue_status', { method: 'GET', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # Published deployment POST https://model-{model_id}.api.baseten.co/deployment/{deployment-id}/predict Use this endpoint to call any [published deployment](/deploy/lifecycle) of your model. ```sh https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/predict ``` ### Parameters The ID of the model you want to call. The ID of the specific deployment you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable model input. ```py Python import urllib3 import os model_id = "" deployment_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```sh Truss truss predict --model-version DEPLOYMENT_ID -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by model {} ``` # Published deployment POST https://chain-{chain_id}.api.baseten.co/deployment/{deployment-id}/run_remote Use this endpoint to call any [published deployment](/deploy/lifecycle) of your chain. ```sh https://chain-{chain_id}.api.baseten.co/deployment/{deployment_id}/run_remote ``` ### Parameters The ID of the chain you want to call. The ID of the specific deployment you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable chain input. The input schema corresponds to the signature of the entrypoint's `run_remote` method. I.e. The top-level keys are the argument names. The values are the corresponding JSON representation of the types. ```py Python import urllib3 import os chain_id = "" deployment_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://chain -{chain_id}.api.baseten.co/deployment/{deployment_id}/run_remote", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable chain input ) print(resp.json()) ``` ```sh cURL curl -X POST https://chain-{chain_id}.api.baseten.co/deployment/{deployment_id}/run_remote \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable chain input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://chain-{chain_id}.api.baseten.co/deployment/{deployment_id}/run_remote', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable chain input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by chain {} ``` # Published deployment POST https://model-{model_id}.api.baseten.co/deployment/{deployment-id}/wake Use this endpoint to wake any scaled-to-zero [published deployment](/deploy/lifecycle) of your model. ```sh https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/wake ``` ### Parameters The ID of the model you want to wake. The ID of the specific deployment you want to wake. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ```py Python import urllib3 import os model_id = "" deployment_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/wake", headers={"Authorization": f"Api-Key {baseten_api_key}"}, ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/wake \ -H 'Authorization: Api-Key YOUR_API_KEY' \ ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/deployment/{deployment_id}/wake', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // Returns a 202 response code {} ``` # Development deployment POST https://model-{model_id}.api.baseten.co/development/async_predict Use this endpoint to call the [development deployment](/deploy/lifecycle) of your model asynchronously. ### Parameters The ID of the model you want to call. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body There is a 256 KiB size limit to `/async_predict` request payloads. JSON-serializable model input. Baseten **does not** store model outputs. If `webhook_endpoint` is empty, your model must save prediction outputs so they can be accessed later. URL of the webhook endpoint. We require that webhook endpoints use HTTPS. Priority of the request. A lower value corresponds to a higher priority (e.g. requests with priority 0 are scheduled before requests of priority 1). `priority` is between 0 and 2, inclusive. Maximum time a request will spend in the queue before expiring. `max_time_in_queue_seconds` must be between 10 seconds and 72 hours, inclusive. Exponential backoff parameters used to retry the model predict request. Number of predict request attempts. `max_attempts` must be between 1 and 10, inclusive. Minimum time between retries in milliseconds. `initial_delay_ms` must be between 0 and 10,000 milliseconds, inclusive. Maximum time between retries in milliseconds. `max_delay_ms` must be between 0 and 60,000 milliseconds, inclusive. ### Response The ID of the async request. ### Rate limits Two types of rate limits apply when making async requests: * Calls to the `/async_predict` endpoint are limited to **200 requests per second**. * Each organization is limited to **50,000 `QUEUED` or `IN_PROGRESS` async requests**, summed across all deployments. If either limit is exceeded, subsequent `/async_predict` requests will receive a 429 status code. To avoid hitting these rate limits, we advise: * Implementing a backpressure mechanism, such as calling `/async_predict` with exponential backoff in response to 429 errors. * Monitoring the [async queue size metric](/observability/metrics#async-queue-size). If your model is accumulating a backlog of requests, consider increasing the number of requests your model can process at once by increasing the number of max replicas or the concurrency target in your autoscaling settings. ```py Python import requests import os model_id = "" webhook_endpoint = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.post( f"https://model-{model_id}.api.baseten.co/development/async_predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": webhook_endpoint # Optional fields for priority, max_time_in_queue_seconds, etc }, ) print(resp.json()) ``` ```sh cURL curl --request POST \ --url https://model-{model_id}.api.baseten.co/development/async_predict \ --header "Authorization: Api-Key $BASETEN_API_KEY" \ --data '{ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook", "priority": 1, "max_time_in_queue_seconds": 100, "inference_retry_config": { "max_attempts": 3, "initial_delay_ms": 1000, "max_delay_ms": 5000 } }' ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/development/async_predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook", "priority": 1, "max_time_in_queue_seconds": 100, "inference_retry_config": { "max_attempts": 3, "initial_delay_ms": 1000, "max_delay_ms": 5000 } }), } ); const data = await resp.json(); console.log(data); ``` ```json 201 { "request_id": "" } ``` # Development deployment GET https://model-{model_id}.api.baseten.co/development/async_queue_status Use this endpoint to get the status of a development deployment's async queue. ### Parameters The ID of the model. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the model. The ID of the deployment. The number of requests in the deployment's async queue with `QUEUED` status (i.e. awaiting processing by the model). The number of requests in the deployment's async queue with `IN_PROGRESS` status (i.e. currently being processed by the model). ```json 200 { "model_id": "", "deployment_id": "", "num_queued_requests": 12, "num_in_progress_requests": 3 } ``` ### Rate limits Calls to the `/async_queue_status` endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. To gracefully handle hitting this rate limit, we advise implementing a backpressure mechanism, such as calling `/async_queue_status` with exponential backoff in response to 429 errors. ```py Python import requests import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.get( f"https://model-{model_id}.api.baseten.co/development/async_queue_status", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request GET \ --url https://model-{model_id}.api.baseten.co/development/async_queue_status \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/development/async_queue_status', { method: 'GET', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # Development deployment POST https://model-{model_id}.api.baseten.co/development/predict Use this endpoint to call the [development deployment](/deploy/lifecycle) of your model. ```sh https://model-{model_id}.api.baseten.co/development/predict ``` ### Parameters The ID of the model you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable model input. ```py Python import urllib3 import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/development/predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/development/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```sh Truss truss predict --model-version DEPLOYMENT_ID -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/development/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by model {} ``` # Development deployment POST https://chain-{chain_id}.api.baseten.co/development/run_remote Use this endpoint to call the [development deployment](/deploy/lifecycle) of your chain. ```sh https://chain-{chain_id}.api.baseten.co/development/run_remote ``` ### Parameters The ID of the chain you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable chain input. The input schema corresponds to the signature of the entrypoint's `run_remote` method. I.e. The top-level keys are the argument names. The values are the corresponding JSON representation of the types. ```py Python import urllib3 import os chain_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://chain-{chain_id}.api.baseten.co/development/run_remote", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable chain input ) print(resp.json()) ``` ```sh cURL curl -X POST https://chain-{chain_id}.api.baseten.co/development/run_remote \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable chain input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://chain-{chain_id}.api.baseten.co/development/run_remote', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable chain input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by chain {} ``` # Development deployment POST https://model-{model_id}.api.baseten.co/development/wake Use this endpoint to wake the [development deployment](/deploy/lifecycle) of your model if it is scaled to zero. ```sh https://model-{model_id}.api.baseten.co/development/wake ``` ### Parameters The ID of the model you want to wake. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ```py Python import urllib3 import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/development/wake", headers={"Authorization": f"Api-Key {baseten_api_key}"}, ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/development/wake \ -H 'Authorization: Api-Key YOUR_API_KEY' \ ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/development/wake', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // Returns a 202 response code {} ``` # 🆕 Async inference by environment POST https://model-{model_id}.api.baseten.co/environments/{env_name}/async_predict Use this endpoint to call the model associated with the specified environment asynchronously. ### Parameters The ID of the model you want to call. The name of the model's environment you want to call. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body There is a 256 KiB size limit to `/async_predict` request payloads. JSON-serializable model input. Baseten **does not** store model outputs. If `webhook_endpoint` is empty, your model must save prediction outputs so they can be accessed later. URL of the webhook endpoint. We require that webhook endpoints use HTTPS. Priority of the request. A lower value corresponds to a higher priority (e.g. requests with priority 0 are scheduled before requests of priority 1). `priority` is between 0 and 2, inclusive. Maximum time a request will spend in the queue before expiring. `max_time_in_queue_seconds` must be between 10 seconds and 72 hours, inclusive. Exponential backoff parameters used to retry the model predict request. Number of predict request attempts. `max_attempts` must be between 1 and 10, inclusive. Minimum time between retries in milliseconds. `initial_delay_ms` must be between 0 and 10,000 milliseconds, inclusive. Maximum time between retries in milliseconds. `max_delay_ms` must be between 0 and 60,000 milliseconds, inclusive. ### Response The ID of the async request. ### Rate limits Two types of rate limits apply when making async requests: * Calls to the `/async_predict` endpoint are limited to **200 requests per second**. * Each organization is limited to **50,000 `QUEUED` or `IN_PROGRESS` async requests**, summed across all deployments. If either limit is exceeded, subsequent `/async_predict` requests will receive a 429 status code. To avoid hitting these rate limits, we advise: * Implementing a backpressure mechanism, such as calling `/async_predict` with exponential backoff in response to 429 errors. * Monitoring the [async queue size metric](/observability/metrics#async-queue-size). If your model is accumulating a backlog of requests, consider increasing the number of requests your model can process at once by increasing the number of max replicas or the concurrency target in your autoscaling settings. ```py Python import requests import os model_id = "" # Replace this with your model ID webhook_endpoint = "" # Replace this with your webhook endpoint URL # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] # Call the async_predict endpoint of the production deployment resp = requests.post( f"https://model-{model_id}.api.baseten.co/production/async_predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": webhook_endpoint # Optional fields for priority, max_time_in_queue_seconds, etc }, ) print(resp.json()) ``` ```sh cURL curl --request POST \ --url https://model-{model_id}.api.baseten.co/environments/{env_name}/async_predict \ --header "Authorization: Api-Key $BASETEN_API_KEY" \ --data '{ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook" }' ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/environments/{env_name}/async_predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook" }), } ); const data = await resp.json(); console.log(data); ``` ```json 201 { "request_id": "" } ``` # Environment deployment GET https://model-{model_id}.api.baseten.co/environments/{env_name}/async_queue_status Use this endpoint to get the async queue status for a model associated with the specified environment. ### Parameters The ID of the model. The name of the environment. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the model. The ID of the deployment. The number of requests in the deployment's async queue with `QUEUED` status (i.e. awaiting processing by the model). The number of requests in the deployment's async queue with `IN_PROGRESS` status (i.e. currently being processed by the model). ```json 200 { "model_id": "", "deployment_id": "", "num_queued_requests": 12, "num_in_progress_requests": 3 } ``` ### Rate limits Calls to the `/async_queue_status` endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. To gracefully handle hitting this rate limit, we advise implementing a backpressure mechanism, such as calling `/async_queue_status` with exponential backoff in response to 429 errors. ```py Python import requests import os model_id = "" env_name = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.get( f"https://model-{model_id}.api.baseten.co/environments/{env_name}/async_queue_status", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request GET \ --url https://model-{model_id}.api.baseten.co/environments/{env_name}/async_queue_status \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/environments/{env_name}/async_queue_status', { method: 'GET', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # 🆕 Inference by environment POST https://model-{model_id}.api.baseten.co/environments/{env_name}/predict Use this endpoint to call the deployment associated with the specified [environment](/deploy/lifecycle#what-is-an-environment). ```sh https://model-{model_id}.api.baseten.co/environments/{env_name}/predict" ``` ### Parameters The ID of the model you want to call. The name of the model's environment you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable model input. ```py Python import urllib3 import os model_id = "" env_name = "staging" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/environments/{env_name}/predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/environments/{env_name}/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/environments/{env_name}/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by model {} ``` # 🆕 Inference by environment POST https://chain-{chain_id}.api.baseten.co/environments/{env_name}/run_remote Use this endpoint to call the deployment associated with the specified environment. ```sh https://chain-{chain}.api.baseten.co/environments/{env_name}/run_remote" ``` ### Parameters The ID of the chain you want to call. The name of the chain's environment you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable chain input. The input schema corresponds to the signature of the entrypoint's `run_remote` method. I.e. The top-level keys are the argument names. The values are the corresponding JSON representation of the types. ```py Python import urllib3 import os chain_id = "" env_name = "staging" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://chain-{chain_id}.api.baseten.co/environments/{env_name}/run_remote", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable chain input ) print(resp.json()) ``` ```sh cURL curl -X POST https://chain-{chain_id}.api.baseten.co/environments/{env_name}/run_remote \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable chain input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://chain-{chain_id}.api.baseten.co/environments/{env_name}/run_remote', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable chain input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by chain {} ``` # Get chain environment get /v1/chains/{chain_id}/environments/{env_name} Gets a chain environment's details and returns the chain environment. # Get all chain environments get /v1/chains/{chain_id}/environments Gets all chain environments for a given chain # Get all model environments get /v1/models/{model_id}/environments Gets all environments for a given model # Get model environment get /v1/models/{model_id}/environments/{env_name} Gets an environment's details and returns the environment. # Get async request status GET https://model-{model_id}.api.baseten.co/async_request/{request_id} Use this endpoint to get the status of an async request. ### Parameters The ID of the model that executed the request. The ID of the async request. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the async request. The ID of the model that executed the request. The ID of the deployment that executed the request. An enum representing the status of the request. Available options: `QUEUED`, `IN_PROGRESS`, `SUCCEEDED`, `FAILED`, `EXPIRED`, `CANCELED`, `WEBHOOK_FAILED` An enum representing the status of sending the predict result to the provided webhook. Available options: `PENDING`, `SUCCEEDED`, `FAILED`, `CANCELED`, `NO_WEBHOOK_PROVIDED` The time in UTC at which the async request was created. The time in UTC at which the async request's status was updated. Any errors that occurred in processing the async request. Empty if no errors occurred. An enum representing the type of error that occurred. Available options: `MODEL_PREDICT_ERROR`, `MODEL_PREDICT_TIMEOUT`, `MODEL_NOT_READY`, `MODEL_DOES_NOT_EXIST`, `MODEL_UNAVAILABLE`, `MODEL_INVALID_INPUT`, `ASYNC_REQUEST_NOT_SUPPORTED`, `INTERNAL_SERVER_ERROR` A message containing details of the error that occurred. ### Rate limits Calls to the get async request status endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. To avoid hitting this rate limit, we recommend [configuring a webhook endpoint](invoke/async#configuring-the-webhook-endpoint) to receive async predict results instead of frequently polling this endpoint for async request statuses. ```py Python import requests import os model_id = "" request_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.get( f"https://model-{model_id}.api.baseten.co/async_request/{request_id}", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request GET \ --url https://model-{model_id}.api.baseten.co/async_request/{request_id} \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/async_request/{request_id}', { method: 'GET', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # Get a chain by ID get /v1/chains/{chain_id} # Any chain deployment by ID get /v1/chains/{chain_id}/deployments/{chain_deployment_id} # Get a model by ID get /v1/models/{model_id} # Any model deployment by ID get /v1/models/{model_id}/deployments/{deployment_id} Gets a model's deployment by id and returns the deployment. # Development model deployment get /v1/models/{model_id}/deployments/development Gets a model's development deployment and returns the deployment. # Production model deployment get /v1/models/{model_id}/deployments/production Gets a model's production deployment and returns the deployment. # Get all chain deployments get /v1/chains/{chain_id}/deployments # Get all chains get /v1/chains # Get all model deployments get /v1/models/{model_id}/deployments # Get all models get /v1/models # Get all secrets get /v1/secrets # Model endpoint migration guide No more JSON wrapper with model output This guide covers the new predict endpoints in two parts: * Showing the format of the new model predict endpoint. * Showing the change that you must make to how you parse model output when you switch to the new endpoint. The change to model output format only applies when you switch to the new endpoints. Model output is unchanged for the old endpoints. ## Updates to endpoint paths The new endpoint uses `model_id` as part of the subdomain, where formerly it was part of the path: ```sh # Old endpoint (for production deployment) https://app.baseten.co/models/{model_id}/predict # New endpoint (for production deployment) https://model-{model_id}.api.baseten.co/production/predict ``` Updated endpoints: * The old `/models/id/predict` endpoint is now the [production deployment endpoint](/api-reference/production-predict). * The old `/model_versions/id/predict` endpoint is now the [published deployment endpoint](/api-reference/deployment-predict). * There's a new endpoint just for the development deployment of a model, the [development deployment endpoint](/api-reference/development-predict). ## Model output response format With the new model endpoints, we've changed the output format of the model response. This change simplifies model responses and removes a step in parsing model output. ### Old endpoint response format For the old endpoint, formatted `https://app.baseten.co/models//predict`, the model output was wrapped in a JSON dictionary with the model ID and model version ID (which is now the deployment ID): ```json Old response { "model_id":"MODEL_ID", "model_version_id":"VERSION_ID", "model_output": { // Output varies by model, this is just an example "prediction": true, "confidence": 0.7839 } } ``` These old endpoints will stay available and the response format for these old endpoints will not change. You only need to change the way you parse your model output when switching to the new endpoints. ### New endpoint response format For the new endpoint, formatted `https://model-.api.baseten.co/production/predict`, the model output is no longer wrapped: ```json New response // Output varies by model, this is just an example { "prediction": true, "confidence": 0.7839 } ``` So, when you change your code to use the new endpoints, also update any code for parsing model responses, as it is no longer wrapped in an additional dictionary: ```python # On old endpoints: model_output = resp.json()["model_output"] # On new endpoints: model_output = resp.json() ``` # Call primary version POST https://app.baseten.co/models/{model_id}/predict This is an old endpoint. Update to the endpoint for a [production deployment](/api-reference/production-predict) and the new model response format based on the [migration guide](/api-reference/migration-guide). Use this endpoint to call the primary version of a model (now known as the production deployment). ```sh https://app.baseten.co/models/{model_id}/predict ``` ### Parameters The ID of the model you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). JSON-serializable model input. ```py Python import urllib3 resp = urllib3.request( "POST", "https://app.baseten.co/models/MODEL_ID/predict", headers={"Authorization": "Api-Key YOUR_API_KEY"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://app.baseten.co/models/MODEL_ID/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```sh Truss truss predict --model MODEL_ID -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://app.baseten.co/models/MODEL_ID/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response { "model_id":"MODEL_ID", "model_version_id":"VERSION_ID", "model_output": { // Output varies by model } } ``` # Wake primary version POST https://app.baseten.co/models/{model_id}/wake This is an old endpoint. Update to the wake endpoint for the [production deployment](/api-reference/production-wake). Use this endpoint to wake a scaled-to-zero model version (now known as a model deployment). ```sh https://app.baseten.co/models/{model_id}/wake ``` ### Parameters The ID of the model you want to wake. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ```py Python import urllib3 import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://app.baseten.co/models/{model_id}/wake", headers={"Authorization": f"Api-Key {baseten_api_key}"}, ) print(resp.json()) ``` ```sh cURL curl -X POST https://app.baseten.co/models/{model_id}/wake \ -H 'Authorization: Api-Key YOUR_API_KEY' \ ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://app.baseten.co/models/{model_id}/wake', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // Returns a 202 response code {} ``` # ChatCompletions POST https://bridge.baseten.co/v1/direct Use this endpoint with the OpenAI Python client and any [deployment](/deploy/lifecycle) of a [compatable](#output) model deployed on Baseten. If you're serving a vLLM model in [OpenAI compatible mode](https://docs.vllm.ai/en/latest/serving/openai_compatible_server.html), this endpoint will support that model out of the box. If your model does not have an OpenAI compatible mode, you can use the [previous version of the bridge](/api-reference/openai-deprecated) to make it compatible with OpenAI's client, but with a more limited set of supported features. ## Calling the model ```sh https://bridge.baseten.co/v1/direct ``` ### Parameters Parameters supported by the OpenAI ChatCompletions request can be found in the [OpenAI documentation](https://github.com/openai/openai-python/blob/main/src/openai/types/chat/completion_create_params.py). Below are details about Baseten-specific arguments that must be passed into the bridge. Typically Hugging Face repo name (e.g. `meta-llama/Meta-Llama-3.1-70B-Instruct`). In some cases, it may be another default specified by your inference engine. Python dictionary that enables extra arguments to be supplied to the chat completion request. Baseten-specific parameters that should be passed to the bridge. The arguments should be passed as a dictionary. The string identifier for the target model. The string identifier for the target deployment. When `deployment_id` is not provided, the [production deployment](/deploy/lifecycle) will be used. ### Output Streaming and non-streaming responses are supported. The [vLLM OpenAI Server](https://github.com/vllm-project/vllm/blob/main/vllm/entrypoints/openai/serving_chat.py) is a good example of how to serve your model results. For streaming outputs, data format must comply with the Server-Side-Events (SSE) format. A helpful example for JSON payloads can be found [here](https://hpbn.co/server-sent-events-sse/#event-stream-protocol). ### Best Practices * Pin your `openai` package version in your requirements.txt file. This helps avoid any breaking changes that get introduced through package upgrades * If you must make breaking changes to your truss server (i.e. to introduce a new feature), you should first publish a new model deployment then update your API call on the client side. ```py OpenAI Python client from openai import OpenAI import os model_id = "abcd1234" # Replace with your model ID deployment_id = "4321cbda" # [Optional] Replace with your deployment ID client = OpenAI( api_key=os.environ["BASETEN_API_KEY"], base_url=f"https://bridge.baseten.co/v1/direct" ) response = client.chat.completions.create( model=f"meta-llama/Meta-Llama-3.1-70B-Instruct", # Replace with your model name messages=[ {"role": "user", "content": "Who won the world series in 2020?"}, {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."}, {"role": "user", "content": "Where was it played?"} ], extra_body={ "baseten": { "model_id": model_id, "deployment_id": deployment_id } } ) print(response.choices[0].message.content) ``` ```json Example Response { "choices": [ { "finish_reason": null, "index": 0, "message": { "content": "The 2020 World Series was played in Texas at Globe Life Field in Arlington.", "role": "assistant" } } ], "created": 1700584611, "id": "chatcmpl-eedbac8f-f68d-4769-a1a7-a1c550be8d08", "model": "abcd1234", "object": "chat.completion", "usage": { "completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0 } } ``` # ChatCompletions (deprecated) POST https://bridge.baseten.co/v1 Follow this step by step guide for using the OpenAI-compatable bridge endpoint. Use this endpoint with the OpenAI Python client and any [deployment](/deploy/lifecycle) of a compatable model deployed on Baseten. ```sh https://bridge.baseten.co/v1 ``` ### Parameters Special attention should be give to the Baseten-specific arguments that must be passed into the bridge via the `extra_body` argument. The name of the model you want to call, such as `"mistral-7b"`. A list of dictionaries containing the chat history to complete. The maximum number of tokens to generate. [Learn more](https://platform.openai.com/docs/api-reference/chat/create#chat-create-max_tokens) Set `stream=True` to stream model output. How deterministic to make the model. [Learn more](https://platform.openai.com/docs/api-reference/chat/create#chat-create-temperature) Alternative to temperature. [Learn more](https://platform.openai.com/docs/api-reference/chat/create#chat-create-top_p) Increase or decrease the model's likelihood to talk about new topics. [Learn more](https://platform.openai.com/docs/api-reference/chat/create#chat-create-presence_penalty) Python dictionary that enables extra arguments to be supplied to the request. Baseten-specific parameters that should be passed to the bridge. The arguments should be passed as a dictionary. The string identifier for the target model. The string identifier for the target deployment. When `deployment_id` is not provided, the [production deployment](/deploy/lifecycle) will be used. ### Output The output will match the ChatCompletions API output format (shown the the right) with two caveats: 1. The output `id` is just a UUID. Baseten API requests are stateless, so this ID would not be meaningful. 2. Values for the `usage` dictionary are not calculated and are set to `0`. Baseten charges for compute directly rather than charging for inference by token. ### Streaming You can also stream your model response by passing `stream=True` to the `client.chat.completions.create()` call. To parse your output, run: ```py for chunk in response: print(chunk.choices[0].delta) ``` ```py OpenAI Python client from openai import OpenAI import os model_id = "abcd1234" # Replace with your model ID deployment_id = "4321dcba" # Optional,eplace with your deployment ID client = OpenAI( api_key=os.environ["BASETEN_API_KEY"], base_url=f"https://bridge.baseten.co/{model_id}/v1" ) response = client.chat.completions.create( model="mistral-7b", messages=[ {"role": "user", "content": "Who won the world series in 2020?"}, {"role": "assistant", "content": "The Los Angeles Dodgers won the World Series in 2020."}, {"role": "user", "content": "Where was it played?"} ], extra_body={ "baseten": { "model_id": model_id, "deployment_id": deployment_id } } ) print(response.choices[0].message.content) ``` ```json Example Response { "choices": [ { "finish_reason": null, "index": 0, "message": { "content": "The 2020 World Series was played in Texas at Globe Life Field in Arlington.", "role": "assistant" } } ], "created": 1700584611, "id": "chatcmpl-eedbac8f-f68d-4769-a1a7-a1c550be8d08", "model": "abcd1234", "object": "chat.completion", "usage": { "completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0 } } ``` # API reference Details on model inference and management APIs Baseten provides two sets of API endpoints: 1. An inference API for calling deployed models 2. A management API for managing your models and workspace Many inference and management API endpoints have different routes for the three types of deployments — `development`, `production`, and individual published deployments — which are listed separately in the sidebar. ## Inference API Each model deployed on Baseten has its own subdomain on `api.baseten.co` to enable faster routing. This subdomain is used for inference endpoints, which are formatted as follows: ``` https://model-{model_id}.api.baseten.co/{deployment_type_or_id}/{endpoint} ``` Where: * `model_id` is the alphanumeric ID of the model, which you can find in your model dashboard. * `deployment_type_or_id` is one of `development`, `production`, or a separate alphanumeric ID for a specific published deployment of the model. * `endpoint` is a supported endpoint such as `predict` that you want to call. The inference API also supports [asynchronous inference](/api-reference/production-async-predict) for long-running tasks and priority queuing. ## Management API Management API endpoints all run through the base `api.baseten.co` subdomain. Use management API endpoints for monitoring, CI/CD, and building both model-level and workspace-level automations. # Production deployment POST https://model-{model_id}.api.baseten.co/production/async_predict Use this endpoint to call the [production deployment](/deploy/lifecycle) of your model asynchronously. ### Parameters The ID of the model you want to call. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body There is a 256 KiB size limit to `/async_predict` request payloads. JSON-serializable model input. Baseten **does not** store model outputs. If `webhook_endpoint` is empty, your model must save prediction outputs so they can be accessed later. URL of the webhook endpoint. We require that webhook endpoints use HTTPS. Priority of the request. A lower value corresponds to a higher priority (e.g. requests with priority 0 are scheduled before requests of priority 1). `priority` is between 0 and 2, inclusive. Maximum time a request will spend in the queue before expiring. `max_time_in_queue_seconds` must be between 10 seconds and 72 hours, inclusive. Exponential backoff parameters used to retry the model predict request. Number of predict request attempts. `max_attempts` must be between 1 and 10, inclusive. Minimum time between retries in milliseconds. `initial_delay_ms` must be between 0 and 10,000 milliseconds, inclusive. Maximum time between retries in milliseconds. `max_delay_ms` must be between 0 and 60,000 milliseconds, inclusive. ### Response The ID of the async request. ### Rate limits Two types of rate limits apply when making async requests: * Calls to the `/async_predict` endpoint are limited to **200 requests per second**. * Each organization is limited to **50,000 `QUEUED` or `IN_PROGRESS` async requests**, summed across all deployments. If either limit is exceeded, subsequent `/async_predict` requests will receive a 429 status code. To avoid hitting these rate limits, we advise: * Implementing a backpressure mechanism, such as calling `/async_predict` with exponential backoff in response to 429 errors. * Monitoring the [async queue size metric](/observability/metrics#async-queue-size). If your model is accumulating a backlog of requests, consider increasing the number of requests your model can process at once by increasing the number of max replicas or the concurrency target in your autoscaling settings. ```py Python import requests import os model_id = "" # Replace this with your model ID webhook_endpoint = "" # Replace this with your webhook endpoint URL # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] # Call the async_predict endpoint of the production deployment resp = requests.post( f"https://model-{model_id}.api.baseten.co/production/async_predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": webhook_endpoint # Optional fields for priority, max_time_in_queue_seconds, etc }, ) print(resp.json()) ``` ```sh cURL curl --request POST \ --url https://model-{model_id}.api.baseten.co/production/async_predict \ --header "Authorization: Api-Key $BASETEN_API_KEY" \ --data '{ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook" }' ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/production/async_predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({ "model_input": {"prompt": "hello world!"}, "webhook_endpoint": "https://my_webhook.com/webhook" }), } ); const data = await resp.json(); console.log(data); ``` ```json 201 { "request_id": "" } ``` # Production deployment GET https://model-{model_id}.api.baseten.co/production/async_queue_status Use this endpoint to get the status of a production deployment's async queue. ### Parameters The ID of the model. ### Headers Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Response The ID of the model. The ID of the deployment. The number of requests in the deployment's async queue with `QUEUED` status (i.e. awaiting processing by the model). The number of requests in the deployment's async queue with `IN_PROGRESS` status (i.e. currently being processed by the model). ```json 200 { "model_id": "", "deployment_id": "", "num_queued_requests": 12, "num_in_progress_requests": 3 } ``` ### Rate limits Calls to the `/async_queue_status` endpoint are limited to **20 requests per second**. If this limit is exceeded, subsequent requests will receive a 429 status code. To gracefully handle hitting this rate limit, we advise implementing a backpressure mechanism, such as calling `/async_queue_status` with exponential backoff in response to 429 errors. ```py Python import requests import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = requests.get( f"https://model-{model_id}.api.baseten.co/production/async_queue_status", headers={"Authorization": f"Api-Key {baseten_api_key}"} ) print(resp.json()) ``` ```sh cURL curl --request GET \ --url https://model-{model_id}.api.baseten.co/production/async_queue_status \ --header "Authorization: Api-Key $BASETEN_API_KEY" ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/production/async_queue_status', { method: 'GET', headers: { Authorization: 'Api-Key YOUR_API_KEY' } } ); const data = await resp.json(); console.log(data); ``` # Production deployment POST https://model-{model_id}.api.baseten.co/production/predict Use this endpoint to call the [production deployment](/deploy/lifecycle) of your model. ```sh https://model-{model_id}.api.baseten.co/production/predict ``` ### Parameters The ID of the model you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable model input. ```py Python import urllib3 import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/production/predict", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/production/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```sh Truss truss predict --model MODEL_ID -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/production/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by model {} ``` # Production deployment POST https://chain-{chain_id}.api.baseten.co/production/run_remote Use this endpoint to call the [production deployment](/deploy/lifecycle) of your chain. ```sh https://chain-{chain_id}.api.baseten.co/production/run_remote ``` ### Parameters The ID of the chain you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ### Body JSON-serializable chain input. The input schema corresponds to the signature of the entrypoint's `run_remote` method. I.e. The top-level keys are the argument names. The values are the corresponding JSON representation of the types. ```py Python import urllib3 import os chain_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://chain-{chain_id}.api.baseten.co/production/run_remote", headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={}, # JSON-serializable chain input ) print(resp.json()) ``` ```sh cURL curl -X POST https://chain-{chain_id}.api.baseten.co/production/run_remote \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable chain input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://chain-{chain_id}.api.baseten.co/production/run_remote', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable chain input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // JSON-serializable output varies by chain {} ``` # Production deployment POST https://model-{model_id}.api.baseten.co/production/wake Use this endpoint to wake the [production deployment](/deploy/lifecycle) of your model if it is scaled to zero. ```sh https://model-{model_id}.api.baseten.co/production/wake ``` ### Parameters The ID of the model you want to wake. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ```py Python import urllib3 import os model_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://model-{model_id}.api.baseten.co/production/wake", headers={"Authorization": f"Api-Key {baseten_api_key}"}, ) print(resp.json()) ``` ```sh cURL curl -X POST https://model-{model_id}.api.baseten.co/production/wake \ -H 'Authorization: Api-Key YOUR_API_KEY' \ ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://model-{model_id}.api.baseten.co/production/wake', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // Returns a 202 response code {} ``` # 🆕 Promote to chain environment post /v1/chains/{chain_id}/environments/{env_name}/promote Promotes an existing chain deployment to an environment and returns the promoted chain deployment. # 🆕 Promote to model environment post /v1/models/{model_id}/environments/{env_name}/promote Promotes an existing deployment to an environment and returns the promoted deployment. # Any model deployment by ID post /v1/models/{model_id}/deployments/{deployment_id}/promote Promotes an existing deployment to production and returns the same deployment. # Development model deployment post /v1/models/{model_id}/deployments/development/promote Creates a new production deployment from the development deployment, the currently building deployment is returned. # Update model environment patch /v1/models/{model_id}/environments/{env_name} Updates an environment's settings and returns the updated environment. # Any model deployment by ID patch /v1/models/{model_id}/deployments/{deployment_id}/autoscaling_settings Updates a deployment's autoscaling settings and returns the update status. # Development model deployment patch /v1/models/{model_id}/deployments/development/autoscaling_settings Updates a development deployment's autoscaling settings and returns the update status. # Production model deployment patch /v1/models/{model_id}/deployments/production/autoscaling_settings Updates a production deployment's autoscaling settings and returns the update status. # Upsert a secret post /v1/secrets Creates a new secret or updates an existing secret if one with the provided name already exists. The name and creation date of the created or updated secret is returned. # Call model version POST https://app.baseten.co/model_versions/{version_id}/predict This is an old endpoint. Update to the endpoint for a [published deployment](/api-reference/deployment-predict) and the new model response format based on the [migration guide](/api-reference/migration-guide). Use this endpoint to call any model version (now known as a model deployment). ```sh https://app.baseten.co/model_versions/{version_id}/predict ``` ### Parameters The version ID of the model you want to call. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). JSON-serializable model input. ```py Python import urllib3 resp = urllib3.request( "POST", "https://app.baseten.co/model_versions/VERSION_ID/predict", headers={"Authorization": "Api-Key YOUR_API_KEY"}, json={}, # JSON-serializable model input ) print(resp.json()) ``` ```sh cURL curl -X POST https://app.baseten.co/model_versions/VERSION_ID/predict \ -H 'Authorization: Api-Key YOUR_API_KEY' \ -d '{}' # JSON-serializable model input ``` ```sh Truss truss predict --model-version VERSION_ID -d '{}' # JSON-serializable model input ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://app.baseten.co/model_versions/VERSION_ID/predict', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, body: JSON.stringify({}), // JSON-serializable model input } ); const data = await resp.json(); console.log(data); ``` ```json Example Response { "model_id":"MODEL_ID", "model_version_id":"VERSION_ID", "model_output": { // Output varies by model } } ``` # Wake model version POST https://app.baseten.co/model_versions/{version_id}/wake This is an old endpoint. Update to the wake endpoint for a [published deployment](/api-reference/deployment-wake). Use this endpoint to wake a scaled-to-zero model version (now known as a model deployment). ```sh https://app.baseten.co/model_versions/{version_id}/wake ``` ### Parameters The ID of the model version you want to wake. Your Baseten API key, formatted with prefix `Api-Key` (e.g. `{"Authorization": "Api-Key abcd1234.abcd1234"}`). ```py Python import urllib3 import os version_id = "" # Read secrets from environment variables baseten_api_key = os.environ["BASETEN_API_KEY"] resp = urllib3.request( "POST", f"https://app.baseten.co/model_versions/{version_id}/wake", headers={"Authorization": f"Api-Key {baseten_api_key}"}, ) print(resp.json()) ``` ```sh cURL curl -X POST https://app.baseten.co/model_versions/{version_id}/wake \ -H 'Authorization: Api-Key YOUR_API_KEY' \ ``` ```js Node.js const fetch = require('node-fetch'); const resp = await fetch( 'https://app.baseten.co/model_versions/{version_id}/wake', { method: 'POST', headers: { Authorization: 'Api-Key YOUR_API_KEY' }, } ); const data = await resp.json(); console.log(data); ``` ```json Example Response // Returns a 202 response code {} ``` # Chains CLI reference Details on Chains CLI Chains is part of the Truss CLI. # `push` ✨ \[new name] ```sh truss chains deploy [OPTIONS] SOURCE [ENTRYPOINT] ``` Deploys a chain remotely. * `SOURCE`: Path to a python file that contains the entrypoint chainlet. * `ENTRYPOINT`: Class name of the entrypoint chainlet in source file. May be omitted if a chainlet definition in `SOURCE` is tagged with `@chains.mark_entrypoint`. Options: * `--name` (TEXT): Name of the chain to be deployed, if not given, the entrypoint name is used. * `--publish / --no-publish`: Create chainlets as a published deployment. * `--promote / --no-promote`: Promote newly deployed chainlets into production. * `--environment` (TEXT): Deploy chainlets into a particular environment. * `--wait / --no-wait`: Wait until all chainlets are ready (or deployment failed). * `--watch / --no-watch`: Watches the chains source code and applies live patches. Using this option will wait for the chain to be deployed (i.e.`--wait` flag is applied), before starting to watch for changes. This option requires the deployment to be a development deployment * `--dryrun`: Produces only generated files, but doesn't deploy anything. * `--remote` (TEXT): Name of the remote in .trussrc to push to. * `--user_env`(TEXT): Key-value-pairs (as JSON str) that can be used to control deployment-specific chainlet behavior. * `--log` `[humanfriendly|I|INFO|D|DEBUG]`: Customizes logging. * `--help`: Show this message and exit. # `watch` ```sh truss chains watch [OPTIONS] SOURCE [ENTRYPOINT] ``` Watches the chains source code and applies live patches to a development deployment. The development deployment must have been deployed before running this command. `SOURCE`: Path to a python file that contains the entrypoint chainlet. `ENTRYPOINT`: Class name of the entrypoint chainlet in source file. May be omitted if a chainlet definition in SOURCE is tagged with `@chains.mark_entrypoint`. Options: * `--name` (TEXT): Name of the chain to be deployed, if not given, the entrypoint name is used. * `--remote`: (TEXT): Name of the remote in .trussrc to push to. * `--user_env`: (TEXT): Key-value-pairs (as JSON str) that can be used to control deployment-specific chainlet behavior. * `--log [humanfriendly|W|WARNING|I|INFO|D|DEBUG]`: Customizes logging. * `--help`: Show this message and exit. # `init` ```sh truss chains init [OPTIONS] [DIRECTORY] ``` Initializes a chains project directory. * `DIRECTORY`: A name of new or existing directory to create the chain in, it must be empty. If not specified, the current directory is used. Options: * `--log` `[humanfriendly|I|INFO|D|DEBUG]`: Customizes logging. * `--help`: Show this message and exit. # `deploy` 🚫 \[deprecated] see `push` above. # Chains reference Details on Chains CLI and configuration options [Chains](/chains/overview) is an abstraction for multi-model inference. The [Chains SDK Reference](/chains-reference/sdk) documents all public Python APIs of chains and configuration options. The [Chains CLI reference](/chains-reference/cli) details the command line interface. # Chains SDK Reference Python SDK Reference for Chains {/* This file is autogenerated, do not edit manually, see: https://github.com/basetenlabs/truss/tree/main/docs/chains/doc_gen */} # Chainlet classes APIs for creating user-defined Chainlets. ### *class* `truss_chains.ChainletBase` Base class for all chainlets. Inheriting from this class adds validations to make sure subclasses adhere to the chainlet pattern and facilitates remote chainlet deployment. Refer to [the docs](https://docs.baseten.co/chains/getting-started) and this [example chainlet](https://github.com/basetenlabs/truss/blob/main/truss-chains/truss_chains/example_chainlet.py) for more guidance on how to create subclasses. ### `truss_chains.depends` Sets a “symbolic marker” to indicate to the framework that a chainlet is a dependency of another chainlet. The return value of `depends` is intended to be used as a default argument in a chainlet’s `__init__`-method. When deploying a chain remotely, a corresponding stub to the remote is injected in its place. In [`run_local`](#truss-chains-run-local) mode an instance of a local chainlet is injected. Refer to [the docs](https://docs.baseten.co/chains/getting-started) and this [example chainlet](https://github.com/basetenlabs/truss/blob/main/truss-chains/truss_chains/example_chainlet.py) for more guidance on how make one chainlet depend on another chainlet. Despite the type annotation, this does *not* immediately provide a chainlet instance. Only when deploying remotely or using `run_local` a chainlet instance is provided. **Parameters:** | Name | Type | Description | | -------------- | --------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------ | | `chainlet_cls` | *Type\[[ChainletBase](#class-truss-chains-chainletbase)]* | The chainlet class of the dependency. | | `retries` | *int* | The number of times to retry the remote chainlet in case of failures (e.g. due to transient network issues). | | `timeout_sec` | *int* | Timeout for the HTTP request to this chainlet. | * **Returns:** A “symbolic marker” to be used as a default argument in a chainlet’s initializer. ### `truss_chains.depends_context` Sets a “symbolic marker” for injecting a context object at runtime. Refer to [the docs](https://docs.baseten.co/chains/getting-started) and this [example chainlet](https://github.com/basetenlabs/truss/blob/main/truss-chains/truss_chains/example_chainlet.py) for more guidance on the `__init__`-signature of chainlets. Despite the type annotation, this does *not* immediately provide a context instance. Only when deploying remotely or using `run_local` a context instance is provided. * **Returns:** A “symbolic marker” to be used as a default argument in a chainlet’s initializer. ### *class* `truss_chains.DeploymentContext` Bases: `pydantic.BaseModel` Bundles config values and resources needed to instantiate Chainlets. The context can optionally added as a trailing argument in a Chainlet’s `__init__` method and then used to set up the chainlet (e.g. using a secret as an access token for downloading model weights). **Parameters:** | Name | Type | Description | | --------------------- | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `data_dir` | *Path\|None* | The directory where the chainlet can store and access data, e.g. for downloading model weights. | | `chainlet_to_service` | *Mapping\[str,[ServiceDescriptor](#class-truss-chains-servicedescriptor)]* | A mapping from chainlet names to service descriptors. This is used create RPCs sessions to dependency chainlets. It contains only the chainlet services that are dependencies of the current chainlet. | | `secrets` | *Mapping\[str,str]* | A mapping from secret names to secret values. It contains only the secrets that are listed in `remote_config.assets.secret_keys` of the current chainlet. | | `environment` | *[Environment](#class-truss-chains-definitions-environment)\|None* | The environment that the chainlet is deployed in. None if the chainlet is not associated with an environment. | #### get\_baseten\_api\_key() * **Return type:** str #### get\_service\_descriptor(chainlet\_name) **Parameters:** | Name | Type | Description | | --------------- | ----- | ------------------------- | | `chainlet_name` | *str* | The name of the chainlet. | * **Return type:** [*ServiceDescriptor*](#class-truss-chains-servicedescriptor) ### *class* `truss_chains.definitions.Environment` Bases: `pydantic.BaseModel` The environment the chainlet is deployed in. * **Parameters:** **name** (*str*) – The name of the environment. ### *class* `truss_chains.ChainletOptions` Bases: `pydantic.BaseModel` **Parameters:** | Name | Type | Description | | -------------------- | ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `enable_b10_tracing` | *bool* | enables baseten-internal trace data collection. This helps baseten engineers better analyze chain performance in case of issues. It is independent of a potentially user-configured tracing instrumentation. Turning this on, could add performance overhead. | | `env_variables` | *Mapping\[str,str]* | static environment variables available to the deployed chainlet. | ### *class* `truss_chains.RPCOptions` Bases: `pydantic.BaseModel` Options to customize RPCs to dependency chainlets. **Parameters:** | Name | Type | Description | | ------------- | ----- | ----------- | | `timeout_sec` | *int* | | | `retries` | *int* | | ### `truss_chains.mark_entrypoint` Decorator to mark a chainlet as the entrypoint of a chain. This decorator can be applied to *one* chainlet in a source file and then the CLI push command simplifies because only the file, but not the chainlet class in the file, needs to be specified. Example usage: ```python import truss_chains as chains @chains.mark_entrypoint class MyChainlet(ChainletBase): ... ``` **Parameters:** | Name | Type | Description | | ----- | --------------------------------------------------------- | ------------------- | | `cls` | *Type\[[ChainletBase](#class-truss-chains-chainletbase)]* | The chainlet class. | * **Return type:** *Type*\[*ChainletBase*] # Remote Configuration These data structures specify for each chainlet how it gets deployed remotely, e.g. dependencies and compute resources. ### *class* `truss_chains.RemoteConfig` Bases: `pydantic.BaseModel` Bundles config values needed to deploy a chainlet remotely. This is specified as a class variable for each chainlet class, e.g.: ```python import truss_chains as chains class MyChainlet(chains.ChainletBase): remote_config = chains.RemoteConfig( docker_image=chains.DockerImage( pip_requirements=["torch==2.0.1", ...] ), compute=chains.Compute(cpu_count=2, gpu="A10G", ...), assets=chains.Assets(secret_keys=["hf_access_token"], ...), ) ``` **Parameters:** | Name | Type | Description | | -------------- | -------------------------------------------------------- | ----------- | | `docker_image` | *[DockerImage](#class-truss-chains-dockerimage)* | | | `compute` | *[Compute](#class-truss-chains-compute)* | | | `assets` | *[Assets](#class-truss-chains-assets)* | | | `name` | *str\|None* | | | `options` | *[ChainletOptions](#class-truss-chains-chainletoptions)* | | ### *class* `truss_chains.DockerImage` Bases: `pydantic.BaseModel` Configures the docker image in which a remoted chainlet is deployed. Any paths are relative to the source file where `DockerImage` is defined and must be created with the helper function [`make_abs_path_here`](#truss-chains-make-abs-path-here). This allows you for example organize chainlets in different (potentially nested) modules and keep their requirement files right next their python source files. **Parameters:** | Name | Type | Description | | ----------------------- | -------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `base_image` | *[BasetenImage](#class-truss-chains-basetenimage)\|[CustomImage](#class-truss-chains-customimage)* | The base image used by the chainlet. Other dependencies and assets are included as additional layers on top of that image. You can choose a baseten default image for a supported python version (e.g. `BasetenImage.PY311`), this will also include GPU drivers if needed, or provide a custom image (e.g. `CustomImage(image="python:3.11-slim")`). | | `pip_requirements_file` | *AbsPath\|None* | Path to a file containing pip requirements. The file content is naively concatenated with `pip_requirements`. | | `pip_requirements` | *list\[str]* | A list of pip requirements to install. The items are naively concatenated with the content of the `pip_requirements_file`. | | `apt_requirements` | *list\[str]* | A list of apt requirements to install. | | `data_dir` | *AbsPath\|None* | Data from this directory is copied into the docker image and accessible to the remote chainlet at runtime. | | `external_package_dirs` | *list\[AbsPath]\|None* | A list of directories containing additional python packages outside the chain’s workspace dir, e.g. a shared library. This code is copied into the docker image and importable at runtime. | ### *class* `truss_chains.BasetenImage` Bases: `Enum` Default images, curated by baseten, for different python versions. If a Chainlet uses GPUs, drivers will be included in the image. | Enum Member | Value | | ----------- | ------- | | `PY310` | *py310* | | `PY311 ` | *py311* | | `PY39` | *py39* | ### *class* `truss_chains.CustomImage` Bases: `pydantic.BaseModel` Configures the usage of a custom image hosted on dockerhub. **Parameters:** | Name | Type | Description | | ------------------------ | -------------------------- | -------------------------------------------------------------------------------------------------------- | | `image` | *str* | Reference to image on dockerhub. | | `python_executable_path` | *str\|None* | Absolute path to python executable (if default `python` is ambiguous). | | `docker_auth` | *DockerAuthSettings\|None* | See [corresponding truss config](https://docs.baseten.co/truss-reference/config#base-image-docker-auth). | ### *class* `truss_chains.Compute` Specifies which compute resources a chainlet has in the *remote* deployment. Not all combinations can be exactly satisfied by available hardware, in some cases more powerful machine types are chosen to make sure requirements are met or over-provisioned. Refer to the [baseten instance reference](https://docs.baseten.co/performance/instances). **Parameters:** | Name | Type | Description | | --------------------- | ----------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | | `cpu_count` | *int* | Minimum number of CPUs to allocate. | | `memory` | *str* | Minimum memory to allocate, e.g. “2Gi” (2 gibibytes). | | `gpu` | *str\|Accelerator\|None* | GPU accelerator type, e.g. “A10G”, “A100”, refer to the [truss config](https://docs.baseten.co/reference/config#resources-accelerator) for more choices. | | `gpu_count` | *int* | Number of GPUs to allocate. | | `predict_concurrency` | *int\|Literal\['cpu\_count']* | Number of concurrent requests a single replica of a deployed chainlet handles. | Concurrency concepts are explained in [this guide](https://docs.baseten.co/deploy/guides/concurrency#predict-concurrency). It is important to understand the difference between predict\_concurrency and the concurrency target (used for autoscaling, i.e. adding or removing replicas). Furthermore, the `predict_concurrency` of a single instance is implemented in two ways: * Via python’s `asyncio`, if `run_remote` is an async def. This requires that `run_remote` yields to the event loop. * With a threadpool if it’s a synchronous function. This requires that the threads don’t have significant CPU load (due to the GIL). ### *class* `truss_chains.Assets` Specifies which assets a chainlet can access in the remote deployment. For example, model weight caching can be used like this: ```python import truss_chains as chains from truss.base import truss_config mistral_cache = truss_config.ModelRepo( repo_id="mistralai/Mistral-7B-Instruct-v0.2", allow_patterns=["*.json", "*.safetensors", ".model"] ) chains.Assets(cached=[mistral_cache], ...) ``` See [truss caching guide](https://docs.baseten.co/deploy/guides/model-cache#enabling-caching-for-a-model) for more details on caching. **Parameters:** | Name | Type | Description | | --------------- | ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | | `cached` | *Iterable\[ModelRepo]* | One or more `truss_config.ModelRepo` objects. | | `secret_keys` | *Iterable\[str]* | Names of secrets stored on baseten, that the chainlet should have access to. You can manage secrets on baseten [here](https://app.baseten.co/settings/secrets). | | `external_data` | *Iterable\[ExternalDataItem]* | Data to be downloaded from public URLs and made available in the deployment (via `context.data_dir`). See [here](https://docs.baseten.co/reference/config#external-data) for more details. | # Core General framework and helper functions. ### `truss_chains.push` Deploys a chain remotely (with all dependent chainlets). **Parameters:** | Name | Type | Description | | ----------------------- | --------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | | `entrypoint` | *Type\[[ChainletBase](#class-truss-chains-chainletbase)]* | The chainlet class that serves as the entrypoint to the chain. | | `chain_name` | *str* | The name of the chain. | | `publish` | *bool* | Whether to publish the chain as a published deployment (it is a draft deployment otherwise) | | `promote` | *bool* | Whether to promote the chain to be the production deployment (this implies publishing as well). | | `only_generate_trusses` | *bool* | Used for debugging purposes. If set to True, only the the underlying truss models for the chainlets are generated in `/tmp/.chains_generated`. | | `remote` | *str\|None* | name of a remote config in .trussrc. If not provided, it will be inquired. | | `environment` | *str\|None* | The name of an environment to promote deployment into. | * **Returns:** A chain service handle to the deployed chain. * **Return type:** [*ChainService*](#class-truss-chains-remote-chainservice) ### `truss_chains.deploy_remotely` Deprecated, use [`push`](#truss-chains-push) instead. ### *class* `truss_chains.remote.ChainService` Handle for a deployed chain. A `ChainService` is created and returned when using `push`. It bundles the individual services for each chainlet in the chain, and provides utilities to query their status, invoke the entrypoint etc. #### get\_info() Queries the statuses of all chainlets in the chain. * **Returns:** List of `DeployedChainlet`, `(name, is_entrypoint, status, logs_url)` for each chainlet. * **Return type:** list\[*DeployedChainlet*] #### *property* name *: str* #### run\_remote(json) Invokes the entrypoint with JSON data. **Parameters:** | Name | Type | Description | | ------ | ----------- | ---------------------------- | | `json` | *JSON dict* | Input data to the entrypoint | * **Returns:** The JSON response. * **Return type:** *Any* #### *property* run\_remote\_url *: str* URL to invoke the entrypoint. #### *property* status\_page\_url *: str* Link to status page on Baseten. ### `truss_chains.make_abs_path_here` Helper to specify file paths relative to the *immediately calling* module. E.g. in you have a project structure like this: ```default root/ chain.py common_requirements.text sub_package/ chainlet.py chainlet_requirements.txt ``` You can now in `root/sub_package/chainlet.py` point to the requirements file like this: ```python shared = make_abs_path_here("../common_requirements.text") specific = make_abs_path_here("chainlet_requirements.text") ``` This helper uses the directory of the immediately calling module as an absolute reference point for resolving the file location. Therefore, you MUST NOT wrap the instantiation of `make_abs_path_here` into a function (e.g. applying decorators) or use dynamic code execution. Ok: ```python def foo(path: AbsPath): abs_path = path.abs_path foo(make_abs_path_here("./somewhere")) ``` Not Ok: ```python def foo(path: str): dangerous_value = make_abs_path_here(path).abs_path foo("./somewhere") ``` **Parameters:** | Name | Type | Description | | ----------- | ----- | -------------------------- | | `file_path` | *str* | Absolute or relative path. | * **Return type:** *AbsPath* ### `truss_chains.run_local` Context manager local debug execution of a chain. The arguments only need to be provided if the chainlets explicitly access any the corresponding fields of [`DeploymentContext`](#class-truss-chains-deploymentcontext). **Parameters:** | Name | Type | Description | | --------------------- | ------------------------------------------------------------------------- | -------------------------------------------------------------- | | `secrets` | *Mapping\[str,str]\|None* | A dict of secrets keys and values to provide to the chainlets. | | `data_dir` | *Path\|str\|None* | Path to a directory with data files. | | `chainlet_to_service` | *Mapping\[str,[ServiceDescriptor](#class-truss-chains-servicedescriptor)* | A dict of chainlet names to service descriptors. | * **Return type:** *ContextManager*\[None] Example usage (as trailing main section in a chain file): ```python import os import truss_chains as chains class HelloWorld(chains.ChainletBase): ... if __name__ == "__main__": with chains.run_local( secrets={"some_token": os.environ["SOME_TOKEN"]}, chainlet_to_service={ "SomeChainlet": chains.ServiceDescriptor( name="SomeChainlet", predict_url="https://...", options=chains.RPCOptions(), ) }, ): hello_world_chain = HelloWorld() result = hello_world_chain.run_remote(max_value=5) print(result) ``` Refer to the [local debugging guide](https://docs.baseten.co/chains/guide#test-a-chain-locally) for more details. ### *class* `truss_chains.ServiceDescriptor` Bases: `pydantic.BaseModel` Bundles values to establish an RPC session to a dependency chainlet, specifically with `StubBase`. **Parameters:** | Name | Type | Description | | ------------- | ---------------------------------------------- | ----------- | | `name` | *str* | | | `predict_url` | *str* | | | `options` | *[RPCOptions](#class-truss-chains-rpcoptions)* | | ## *class* `truss_chains.StubBase` Base class for stubs that invoke remote chainlets. It is used internally for RPCs to dependency chainlets, but it can also be used in user-code for wrapping a deployed truss model into the chains framework, e.g. like that: ```python import pydantic import truss_chains as chains class WhisperOutput(pydantic.BaseModel): ... class DeployedWhisper(chains.StubBase): async def run_remote(self, audio_b64: str) -> WhisperOutput: resp = await self._remote.predict_async( json_payload={"audio": audio_b64}) return WhisperOutput(text=resp["text"], language=resp["language"]) class MyChainlet(chains.ChainletBase): def __init__(self, ..., context=chains.depends_context()): ... self._whisper = DeployedWhisper.from_url( WHISPER_URL, context, options=chains.RPCOptions(retries=3), ) ``` **Parameters:** | Name | Type | Description | | -------------------- | ------------------------------------------------------------ | ----------------------------------------- | | `service_descriptor` | *[ServiceDescriptor](#class-truss-chains-servicedescriptor)* | Contains the URL and other configuration. | | `api_key` | *str* | A baseten API key to authorize requests. | #### *classmethod* from\_url(predict\_url, context, options=None) Factory method, convenient to be used in chainlet’s `__init__`-method. **Parameters:** | Name | Type | Description | | ------------- | ------------------------------------------------------------ | ----------------------------------------------------------------- | | `predict_url` | *str* | URL to predict endpoint of another chain / truss model. | | `context` | *[DeploymentContext](#class-truss-chains-deploymentcontext)* | Deployment context object, obtained in the chainlet’s `__init__`. | | `options` | *[RPCOptions](#class-truss-chains-rpcoptions)* | RPC options, e.g. retries. | ### *class* `truss_chains.RemoteErrorDetail` Bases: `pydantic.BaseModel` When a remote chainlet raises an exception, this pydantic model contains information about the error and stack trace and is included in JSON form in the error response. **Parameters:** | Name | Type | Description | | ----------------------- | ------------------- | ----------- | | `remote_name` | *str* | | | `exception_cls_name` | *str* | | | `exception_module_name` | *str\|None* | | | `exception_message` | *str* | | | `user_stack_trace` | *list\[StackFrame]* | | #### format() Format the error for printing, similar to how Python formats exceptions with stack traces. * **Return type:** str # Concepts Glossary of Chains concepts and terminology Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). ## Chainlet A Chainlet is the basic building block of Chains. A Chainlet is a Python class that specifies: * A set of compute resources. * A Python environment with software dependencies. * A typed interface [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) for other Chainlets to call. This is the simplest possible Chainlet — only the [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) method is required — and we can layer in other concepts to create a more capable Chainlet. ```python import truss_chains as chains class SayHello(chains.ChainletBase): def run_remote(self, name: str) -> str: return f"Hello, {name}" ``` ### Remote configuration Chainlets are meant for deployment as remote services. Each Chainlet specifies its own requirements for compute hardware (CPU count, GPU type and count, etc) and software dependencies (Python libraries or system packages). This configuration is built into a Docker image automatically as part of the deployment process. When no configuration is provided, the Chainlet will be deployed on a basic instance with one vCPU, 2GB of RAM, no GPU, and a standard set of Python and system packages. Configuration is set using the [`remote_config`](/chains-reference/sdk#remote-configuration) class variable within the Chainlet: ```python import truss_chains as chains class MyChainlet(chains.ChainletBase): remote_config = chains.RemoteConfig( docker_image=chains.DockerImage( pip_requirements=["torch==2.3.0", ...] ), compute=chains.Compute(gpu="H100", ...), assets=chains.Assets(secret_keys=["hf_access_token"], ...), ) ``` See the [remote configuration reference](/chains-reference/sdk#remote-configuration) for a complete list of options. ### Initialization Chainlets are implemented as classes because we often want to set up expensive static resources once at startup and then re-use it with each invocation of the Chainlet. For example, we only want to initialize an AI model and download its weights once then re-use it every time we run inference. We do this setup in `__init__()`, which is run exactly once when the Chainlet is deployed or scaled up. ```python import truss_chains as chains class PhiLLM(chains.ChainletBase): def __init__(self) -> None: import torch import transformers self._model = transformers.AutoModelForCausalLM.from_pretrained( PHI_HF_MODEL, torch_dtype=torch.float16, device_map="auto", ) self._tokenizer = transformers.AutoTokenizer.from_pretrained( PHI_HF_MODEL, ) ``` Chainlet initialization also has two important features: context and dependency injection of other Chainlets, explained below. #### Context (access information) You can add [`DeploymentContext`](/chains-reference/sdk#class-truss-chains-deploymentcontext-generic-userconfigt) object as an optional argument to the `__init__`-method of a Chainlet. This allows you to use secrets within your Chainlet, such as using a `hf_access_token` to access a gated model on Hugging Face (note that when using secrets, they also need to be added to the `assets`). ```python import truss_chains as chains class MistralLLM(chains.ChainletBase): remote_config = chains.RemoteConfig( ... assets=chains.Assets(secret_keys=["hf_access_token"], ...), ) def __init__( self, # Adding the `context` argument, allows us to access secrets context: chains.DeploymentContext = chains.depends_context(), ) -> None: import transformers # Using the secret from context to access a gated model on HF self._model = transformers.AutoModelForCausalLM.from_pretrained( "mistralai/Mistral-7B-Instruct-v0.2", use_auth_token=context.secrets["hf_access_token"], ) ``` #### Depends (call other Chainlets) The Chains framework uses the [`chains.depends()`](/chains-reference/sdk#truss-chains-depends) function in Chainlets' `__init__()` method to track the dependency relationship between different Chainlets within a Chain. This syntax, inspired by dependency injection, is used to translate local Python function calls into calls to the remote Chainlets in production. Once a dependency Chainlet is added with [`chains.depends()`](/chains-reference/sdk#truss-chains-depends), its [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) method can call this dependency Chainlet, e.g. below `HelloAll` we can make calls to `SayHello`: ```python import truss_chains as chains class HelloAll(chains.ChainletBase): def __init__(self, say_hello_chainlet=chains.depends(SayHello)) -> None: self._say_hello = say_hello_chainlet def run_remote(self, names: list[str]) -> str: output = [] for name in names: output.append(self._say_hello.run_remote(name)) return "\n".join(output) ``` ## Run remote (chaining Chainlets) The `run_remote()` method is run each time the Chainlet is called. It is the sole public interface for the Chainlet (though you can have as many private helper functions as you want) and its inputs and outputs must have type annotations. In `run_remote()` you implement the actual work of the Chainlet, such as model inference or data chunking: ```python import truss_chains as chains class PhiLLM(chains.ChainletBase): def run_remote(self, messages: Messages) -> str: import torch model_inputs = self._tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = self._tokenizer(model_inputs, return_tensors="pt") input_ids = inputs["input_ids"].to("cuda") with torch.no_grad(): outputs = self._model.generate(input_ids=input_ids, **self._generate_args) output_text = self._tokenizer.decode(outputs[0], skip_special_tokens=True) return output_text ``` If `run_remote()` makes calls to other Chainlets, e.g. invoking a dependency Chainlet for each element in a list, you can benefit from concurrent execution, by making the `run_remote()` an `async` method and starting the calls as concurrent tasks `asyncio.ensure_future(self._dep_chainlet.run_remote(...))`. ## Entrypoint The entrypoint is called directly from the deployed Chain's API endpoint and kicks off the entire chain. The entrypoint is also responsible for returning the final result back to the client. Using the [`@chains.mark_entrypoint`](/chains-reference/sdk#truss-chains-mark-entrypoint) decorator, one Chainlet within a file is set as the entrypoint to the chain. ```python @chains.mark_entrypoint class HelloAll(chains.ChainletBase): ``` ## Stub Chains can be combined with existing Truss models using Stubs. A Stub acts as a substitute (client-side proxy) for a remotely deployed dependency, either a Chainlet or a Truss model. The Stub performs the remote invocations as if it were local by taking care of the transport layer, authentication, data serialization and retries. Stubs can be integrated into Chainlets by passing in a URL of the deployed model. They also require [`context`](/chains/concepts#context-access-information) to be initialized (for authentication). ```python import truss_chains as chains class LLMClient(chains.StubBase): async def run_remote( self, prompt: str ) -> str: # Call the deployed model resp = await self._remote.predict_async(json_payload={ "messages": [{"role": "user", "content": prompt}], "stream" : False }) # Return a string with the model output return resp["output"] LLM_URL = ... class MyChainlet(chains.ChainletBase): def __init__( self, context: chains.DeploymentContext = chains.depends_context(), ): self._llm = LLMClient.from_url(LLM_URL, context) ``` See the [StubBase reference](/chains-reference/sdk#class-truss-chains-stubbase) for details on the `StubBase` implementation. ## Pydantic data types To make orchestrating multiple remotely deployed services possible, Chains relies heavily on typed inputs and outputs. Values must be serialized to a safe exchange format to be sent over the network. The Chains framework uses the type annotations to infer how data should be serialized and currently is restricted to types that are JSON compatible. Types can be: * Direct type annotations for simple types such as `int`, `float`, or `list[str]`. * Pydantic models to define a schema for nested data structures or multiple arguments. An example of pydantic input and output types for a Chainlet is given below: ```python import enum import pydantic class Modes(enum.Enum): MODE_0 = "MODE_0" MODE_1 = "MODE_1" class SplitTextInput(pydantic.BaseModel): data: str num_partitions: int mode: Modes class SplitTextOutput(pydantic.BaseModel): parts: list[str] part_lens: list[int] ``` Refer to the [pydantic docs](https://docs.pydantic.dev/latest/) for more details on how to define custom pydantic data models. We are working on more efficient support for numeric data and bytes, for the time being a workaround for dealing with these types is to use base64-encoding and add them as a string-valued field to a pydantic model. ## Chains compared to Truss Chains is an alternate SDK for packaging and deploying AI models. It carries over many features and concepts from Truss and gives you access to the benefits of Baseten (resource provisioning, autoscaling, fast cold starts, etc), but it is not a 1-1 replacement for Truss. Here are some key differences: * Rather than running `truss init` and creating a Truss in a directory, a Chain is a single file, giving you more flexibility for implementing multi-step model inference. Create an example with `truss chains init`. * Configuration is done inline in typed Python code rather than in a `config.yaml` file. * While Chainlets are converted to Truss models when run on Baseten, `Chainlet != TrussModel`. Chains is designed for compatibility and incremental adoption, with a stub function for wrapping existing deployed models. # Audio Transcription Chain Transcribe hours of audio to text in a few seconds Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). [Learn more about Chains](/chains/overview). ## Prerequisites To use Chains, install a recent Truss version and ensure pydantic is v2: ```bash pip install --upgrade truss 'pydantic>=2.0.0' ``` Truss requires python `>=3.8,<3.13`. To set up a fresh development environment, you can use the following commands, creating a environment named `chains_env` using `pyenv`: ```bash curl https://pyenv.run | bash echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc echo '[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc echo 'eval "$(pyenv init -)"' >> ~/.bashrc source ~/.bashrc pyenv install 3.11.0 ENV_NAME="chains_env" pyenv virtualenv 3.11.0 $ENV_NAME pyenv activate $ENV_NAME pip install --upgrade truss 'pydantic>=2.0.0' ``` To deploy Chains remotely, you also need a [Baseten account](https://app.baseten.co/signup). It is handy to export your API key to the current shell session or permanently in your `.bashrc`: ```bash ~/.bashrc export BASETEN_API_KEY="nPh8..." ``` # Overview This example shows how to transcribe audio media files to text blazingly fast and at high quality using a Chain. To achieve this we will combine a number of methods: * Partitioning large input files (10h+) into smaller chunks. * Analyzing the audio for silence to find optimal split points of the chunks. * Distributing the chunks tasks across auto-scaling Baseten deployments. * Using batching with a highly optimized transcription model to maximize GPU utilization. * Range downloads and pipelining of audio extraction to minimize latency. * `asyncio` for concurrent execution of tasks. The implementation is quite a bit of code, located in the [Chains examples repo](https://github.com/basetenlabs/truss/tree/main/truss-chains/examples/audio-transcription). This guide is a commentary on the code, pointing out critical parts or explaining design choices. If you want to try out this Chain and create a customized version of it, check out the [try it yourself section](#try-it-yourself) below. ## The Chain structure The chunking has a 2-step hierarchy: "macro chunks" partition the full media into segments of in the range of \~300s. This ensures that for very long files, the workload of a single `MacroChunkWorker` is limited by that duration and the source data for the different macro chunks is downloaded in parallel, making processing very long files much faster. For shorter inputs, there will be only a single "macro chunk". "micro chunks" have durations in the range of 5-30s. These are sent to the transcription model. More details in the explanations of the Chainlets below. The `WhisperModel` is split off the transcription Chain. This is optional, but has some advantages: * A lot of "business logic", which might more frequently be changed, is implemented in the Chain. When developing or changing the Chain and making frequent re-deployments, it's a faster dev loop to not re-deploy the Whisper model, since as a large GPU model with heavy dependencies, this is slower. * The Whisper model can be used in other Chains, or standalone, if it's not part of this Chain. Specifically the same model can be used by dev and prod version of a Chain - otherwise a separate Whisper model would need to be deployed with each environment. * When making changes and improvements to the Whisper model, the development can be split of the development of the Chain - think of a separation of concerns into high-level (the Chain) and low-level (the model) development. More information on how to use and deploy non-Chain models within a Chain is given in the [WhisperModel section](#whispermodel) below. ### `Transcribe` This Chainlet is the "entrypoint" to the Chain, external client send transcription requests to it. It's endpoint implementation has the following signature: ```python async def run_remote( self, media_url: str, params: data_types.TranscribeParams ) -> data_types.TranscribeOutput: ``` The input arguments are separated into `media_url`, the audio source to work on, and `params` that control the execution, e.g. the chunk sizes. You can find the exact schemas and docstrings of these arguments in [data\_types.py](https://github.com/basetenlabs/truss/blob/main/truss-chains/examples/transcribe/data_types.py). An example request looks like this: ```bash curl -X POST $INVOCATION_URL \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '' ``` with JSON input: ```json { "media_url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/TearsOfSteel.mp4", "params": { "micro_chunk_size_sec": 30, "macro_chunk_size_sec": 300 } } ``` The output looks like this (truncated): ```json { "segments": [ ... { "start_time_sec": 517.9465, "end_time_sec": 547.70975, "text": "The world's changed, Celia. Maybe we can too. Memory override complete!", "language": "english", "bcp47_key": "en" }, { "start_time_sec": 547.70975, "end_time_sec": 567.0716874999999, "text": "You know, there's a lesson to be learned from this. Could've gone worse.", "language": "english", "bcp47_key": "en" }, ... ], "input_duration_sec": 734.261406, "processing_duration_sec": 82.42135119438171, "speedup": 8.908631020478238 } ``` The `Transcribe` Chainlet does the following: * Assert that the media URL supports range downloads. This is usually a given for video / audio hosting services. * Uses `FFMPEG` to query the length of the medium (both video and audio files are supported). * Generates a list of "macro chunks", defined by their start and end times. The length is defined by `macro_chunk_size_sec` in `TranscribeParams`. This will soon be upgraded to find silence aware split points, so that a chunk does not end in the middle of a spoken word. To do this a small segment around the desired chunk boundary is downloaded (e.g. +/- 5 seconds) and the most silent timestamp within is determined. * Sends the media URL with chunk limits as "tasks" to `MacroChunkWorker`. Using `asyncio.ensure_future`, these tasks are dispatched concurrently - meaning that the loop over the chunks does not wait for each chunk to complete first, before dispatched the task on the next chunk. These "calls" are network requests (RPCs) to the `MacroChunkWorker` Chainlet which runs on its own deployment and can auto-scale, depending on the load. * Once all tasks are dispatched, it waits for the results and concatenates all the partial transcriptions from the chunks to a final output. ### `MacroChunkWorker` The `MacroChunkWorker` Chainlet works on chunk tasks it receives from the `Transcribe` Chainlet. For each chunk it does the following: * It starts a `DownloadSubprocess` asynchronously (i.e. this will need CPU on the machine, but not block the event loop of the main process, making it possible to serve multiple concurrent requests). * In `DownloadSubprocess`, `FFMPEG` is used to download the relevant time range from the source. It extracts the audio wave form and streams the raw wave `bytes` back to the main process. This happens on-the-fly (i.e. not waiting for the full download to complete) - so the initial latency until receiving wave bytes is minimized. Furthermore, it resamples the wave form to the sampling rate expected by the transcription model and averages multichannel audio to a mono signal. {/* One detail is that when streaming the wave bytes to the main process, we need to intercept the wave metadata from the header. There is a function in `helpers.py` for this: `_extract_wav_info`. Quite a lot of case distinctions and logging is done for error handling and resource cleanup in case of failures, e.g. in the exiting of the `DownloadSubprocess`-context. */} * The main process reads as many bytes from the wave stream as needed for `micro_chunk_size_sec` (5-30s). * A helper function `_find_silent_split_point` analyzes the wave form to find the most silent point in the *second half* of the chunk. E.g. if the `micro_chunk_size_sec` is 5s, then it searches the most silent point between 2.5 and 5.0s and uses this time to partition the chunk. * The wave bytes are converted to wave file format (i.e. including metadata in the header) and then b64-encoded, so they can be sent as JSON via HTTP. * For each b64-encoded "micro" chunk, the transcription model is invoked. * Like in the `Transcribe` Chainlet, these tasks are concurrent RPCs, the transcription model deployment can auto-scale with the load. * Finally, we wait for all "micro chunk" results, concatenate them to a * "macro chunk" result and return it to `Transcribe`. ### `WhisperModel` As mentioned in the [structure section](#the-chain-structure), the `WhisperModel` is separately deployed from the transcription Chain. In the Chain implementation we only need to define a small "adapter" class `WhisperModel`, mainly for integrating the I/O types of that model with our Chain. This is a subclass of `chains. StubBase` which abstracts sending requests, retries etc. away from us (this class is also used for all RPCs that the Chains framework makes internally). Furthermore, we need to take the invocation URL of that model (e.g. `https://model-5woz91z3.api.baseten.co/production/predict`) and pass it along when initializing this adapter class with the `from_url` factory-method. There are two options for deploying a model separately from a Chain: **As a Chainlet** This is done in this example. As a Chainlet it can even be in the same file, but not "wired" into the Chain with the `chains.depends`-directive. In this example we put it into a separate file `whisper_chainlet.py`. * It will not be included in the deployment when running the `truss chains deploy transcribe.py` command for the entrypoint, since it's not formally a tracked dependency of that Chain. * It is separately deployed, with a deploy command specifically targeting that class i.e. `truss chains push whisper_chainlet.py`. Using a structure like this, has the advantage of benefiting from high code-coherence, e.g. the pydantic models for the input and output are shared in both files (defined in the common `data_types.py`), while still allowing independent deployment cycles. **As a conventional Truss model** This is not done in this example. This could be anything, from the [model library](https://www.baseten.co/library/), the [Truss examples repo](https://github.com/basetenlabs/truss-examples) or your [own Truss model](https://truss.baseten.co/quickstart). This might be the better choice, if the model has a substantial code base itself and if you want to avoid mixing that (and the development of it) with the Chain code. # Performance considerations Even for very large files, e.g. 10h+, the end-to-end runtime is still bounded: since the `macro_chunk_size_sec` is fixed, each sub-task has a bounded runtime. So provided all Chainlet components have enough resources to auto-scale horizontally and the network bandwidth of the source hosting is sufficient, the overall runtime is still relatively small. Note that auto-scaling, e.g. the transcription model, to a large number of replicas can take a while, so you'll only see the full speedup after a "warm-up" phase. Depending on distribution of your input durations and the "spikiness" of your traffic there are a few knobs to tweak: * `micro_chunk_size_sec`: using too small "micro" chunks creates more overhead and leaves GPU underutilized, using too large ones, they processing of a single chunk might take too long or overflow the GPU model \-- the sweet spot is in the middle. * `macro_chunk_size_sec`: larger chunks mean less overhead, but also less download parallelism. * Predict-concurrency and autoscaling settings of all deployed components. Specifically make sure that the WhisperModel can scale up to enough replicas (but should also not be underutilized). Look at the GPU and CPU utilization metrics of the deployments. # Try it yourself If you want to try this yourself follow the steps below: All code can be found and copied in this [example directory](https://github.com/basetenlabs/truss/tree/main/truss-chains/examples/audio-transcription). * Download the example code. * Deploy the Whisper Chainlet first: `truss chains push whisper_chainlet.py`. * Note the invocation URL of the form `https://chain-.api.baseten.co/production/run_remote` and insert that URL as a value for `WHISPER_URL` in `transcribe.py`. You can find the URL in the output of the push command or on the status page. * Deploy the transcription Chain with `truss chains push transcribe.py`. As media source URL, you can pass both video or audio sources, as long as the format can be handled by `FFMPEG` and the hosted file supports range downloads. A public test file you can use is shown in the example below. ```bash curl -X POST $INVOCATION_URL \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '' ``` with JSON input: ```json { "media_url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/TearsOfSteel.mp4", "params": { "micro_chunk_size_sec": 30, "macro_chunk_size_sec": 300 } } ``` # RAG Chain Build a RAG (retrieval-augmented generation) pipeline with Chains Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). [Learn more about Chains](/chains/overview) ## Prerequisites To use Chains, install a recent Truss version and ensure pydantic is v2: ```bash pip install --upgrade truss 'pydantic>=2.0.0' ``` Truss requires python `>=3.8,<3.13`. To set up a fresh development environment, you can use the following commands, creating a environment named `chains_env` using `pyenv`: ```bash curl https://pyenv.run | bash echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc echo '[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc echo 'eval "$(pyenv init -)"' >> ~/.bashrc source ~/.bashrc pyenv install 3.11.0 ENV_NAME="chains_env" pyenv virtualenv 3.11.0 $ENV_NAME pyenv activate $ENV_NAME pip install --upgrade truss 'pydantic>=2.0.0' ``` To deploy Chains remotely, you also need a [Baseten account](https://app.baseten.co/signup). It is handy to export your API key to the current shell session or permanently in your `.bashrc`: ```bash ~/.bashrc export BASETEN_API_KEY="nPh8..." ``` If you want to run this example in [local debugging mode](/chains/guide#test-a-chain-locally), you'll also need to install chromadb: ```shell pip install chromadb ``` The complete code used in this tutorial can also be found in the [Chains examples repo](https://github.com/basetenlabs/truss/tree/main/truss-chains/examples/rag). # Overview Retrieval-augmented generation (RAG) is a multi-model pipeline for generating context-aware answers from LLMs. There are a number of ways to build a RAG system. This tutorial shows a minimum viable implementation with a basic vector store and retrieval function. It's intended as a starting point to show how Chains helps you flexibly combine model inference and business logic. In this tutorial, we'll build a simple RAG pipeline for a hypothetical alumni matching service for a university. The system: 1. Takes a bio with information about a new graduate 2. Uses a vector database to retrieve semantically similar bios of other alums 3. Uses an LLM to explain why the new graduate should meet the selected alums 4. Returns the writeup from the LLM Let's dive in! ## Building the Chain Create a file `rag.py` in a new directory with: ```sh mkdir rag touch rag/rag.py cd rag ``` Our RAG Chain is composed of three parts: * `VectorStore`, a Chainlet that implements a vector database with a retrieval function. * `LLMClient`, a Stub for connecting to a deployed LLM. * `RAG`, the entrypoint Chainlet that orchestrates the RAG pipeline and has `VectorStore` and `LLMClient` as dependencies. We'll examine these components one by one and then see how they all work together. ### Vector store Chainlet A real production RAG system would use a hosted vector database with a massive number of stored embeddings. For this example, we're using a small local vector store built with `chromadb` to stand in for a more complex system. The Chainlet has three parts: * [`remote_config`](/chains-reference/sdk#remote-configuration), which configures a Docker image on deployment with dependencies. * `__init__()`, which runs once when the Chainlet is spun up, and creates the vector database with ten sample bios. * [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets), which runs each time the Chainlet is called and is the sole public interface for the Chainlet. ```python rag/rag.py import truss_chains as chains # Create a Chainlet to serve as our vector database. class VectorStore(chains.ChainletBase): # Add chromadb as a dependency for deployment. remote_config = chains.RemoteConfig( docker_image=chains.DockerImage( pip_requirements=["chromadb"] ) ) # Runs once when the Chainlet is deployed or scaled up. def __init__(self): # Import Chainlet-specific dependencies in init, not at the top of # the file. import chromadb self._chroma_client = chromadb.EphemeralClient() self._collection = self._chroma_client.create_collection(name="bios") # Sample documents are hard-coded for your convenience documents = [ "Angela Martinez is a tech entrepreneur based in San Francisco. As the founder and CEO of a successful AI startup, she is a leading figure in the tech community. Outside of work, Angela enjoys hiking the trails around the Bay Area and volunteering at local animal shelters.", "Ravi Patel resides in New York City, where he works as a financial analyst. Known for his keen insight into market trends, Ravi spends his weekends playing chess in Central Park and exploring the city's diverse culinary scene.", "Sara Kim is a digital marketing specialist living in San Francisco. She helps brands build their online presence with creative strategies. Outside of work, Sara is passionate about photography and enjoys hiking the trails around the Bay Area.", "David O'Connor calls New York City his home and works as a high school teacher. He is dedicated to inspiring the next generation through education. In his free time, David loves running along the Hudson River and participating in local theater productions.", "Lena Rossi is an architect based in San Francisco. She designs sustainable and innovative buildings that contribute to the city's skyline. When she's not working, Lena enjoys practicing yoga and exploring art galleries.", "Akio Tanaka lives in Tokyo and is a software developer specializing in mobile apps. Akio is an avid gamer and enjoys attending eSports tournaments. He also has a passion for cooking and often experiments with new recipes in his spare time.", "Maria Silva is a nurse residing in New York City. She is dedicated to providing compassionate care to her patients. Maria finds joy in gardening and often spends her weekends tending to her vibrant flower beds and vegetable garden.", "John Smith is a journalist based in San Francisco. He reports on international politics and has a knack for uncovering compelling stories. Outside of work, John is a history buff who enjoys visiting museums and historical sites.", "Aisha Mohammed lives in Tokyo and works as a graphic designer. She creates visually stunning graphics for a variety of clients. Aisha loves to paint and often showcases her artwork in local exhibitions.", "Carlos Mendes is an environmental engineer in San Francisco. He is passionate about developing sustainable solutions for urban areas. In his leisure time, Carlos enjoys surfing and participating in beach clean-up initiatives." ] # Add all documents to the database self._collection.add( documents=documents, ids=[f"id{n}" for n in range(len(documents))] ) # Runs each time the Chainlet is called async def run_remote(self, query: str) -> list[str]: # This call to includes embedding the query string. results = self._collection.query(query_texts=[query], n_results=2) if results is None or not results: raise ValueError("No bios returned from the query") if not results["documents"] or not results["documents"][0]: raise ValueError("Bios are empty") return results["documents"][0] ``` ### LLM inference stub Now that we can retrieve relevant bios from the vector database, we need to pass that information to an LLM to generate our final output. Chains can integrate previously deployed models using a Stub. Like Chainlets, Stubs implement [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets), but as a call to the deployed model. For our LLM, we'll use Phi-3 Mini Instruct, a small-but-mighty open source LLM. One-click model deployment from Baseten's model library. While the model is deploying, be sure to note down the models' invocation URL from the model dashboard for use in the next step. To use our deployed LLM in the RAG Chain, we define a Stub: ```python rag/rag.py class LLMClient(chains.StubBase): # Runs each time the Stub is called async def run_remote(self, new_bio: str, bios: list[str]) -> str: # Use the retrieved bios to augment the prompt -- here's the "A" in RAG! prompt = f"""You are matching alumni of a college to help them make connections. Explain why the person described first would want to meet the people selected from the matching database. Person you're matching: {new_bio} People from database: {" ".join(bios)}""" # Call the deployed model. resp = await self._remote.predict_async(json_payload={ "messages": [{"role": "user", "content": prompt}], "stream" : False }) return resp["output"][len(prompt) :].strip() ``` ### RAG entrypoint Chainlet The entrypoint to a Chain is the Chainlet that specifies the public-facing input and output of the Chain and orchestrates calls to dependencies. The `__init__` function in this Chainlet takes two new arguments: * Add dependencies to any Chainlet with [`chains.depends()`](/chains-reference/sdk#truss-chains-depends). Only Chainlets, not Stubs, need to be added in this fashion. * Use [`chains.depends_context()`](/chains-reference/sdk#truss-chains-depends-context) to inject a context object at runtime. This context object is required to initialize the `LLMClient` stub. * Visit your [baseten workspace](https://app.baseten.co/models) to find your the URL of the previously deployed Phi-3 model and insert if as value for `LLM_URL`. ```python rag/rag.py # Insert the URL from the previously deployed Phi-3 model. LLM_URL = ... @chains.mark_entrypoint class RAG(chains.ChainletBase): # Runs once when the Chainlet is spun up def __init__( self, # Declare dependency chainlets. vector_store: VectorStore = chains.depends(VectorStore), context: chains.DeploymentContext = chains.depends_context(), ): self._vector_store = vector_store # The stub needs the context for setting up authentication. self._llm = LLMClient.from_url(LLM_URL, context) # Runs each time the Chain is called async def run_remote(self, new_bio: str) -> str: # Use the VectorStore Chainlet for context retrieval. bios = await self._vector_store.run_remote(new_bio) # Use the LLMClient Stub for augmented generation. contacts = await self._llm.run_remote(new_bio, bios) return contacts ``` ## Testing locally Because our Chain uses a Stub for the LLM call, we can run the whole Chain locally without any GPU resources. Before running the Chainlet, make sure to set your Baseten API key as an environment variable `BASETEN_API_KEY`. ```python rag/rag.py if __name__ == "__main__": import os import asyncio with chains.run_local( # This secret is needed even locally, because part of this chain # calls the separately deployed Phi-3 model. Only the Chainlets # actually run locally. secrets={"baseten_chain_api_key": os.environ["BASETEN_API_KEY"]} ): rag_client = RAG() result = asyncio.get_event_loop().run_until_complete( rag_client.run_remote( """ Sam just moved to Manhattan for his new job at a large bank. In college, he enjoyed building sets for student plays. """ ) ) print(result) ``` We can run our Chain locally: ```sh python rag.py ``` After a few moments, we should get a recommendation for why Sam should meet the alumni selected from the database. ## Deploying to production Once we're satisfied with our Chain's local behavior, we can deploy it to production on Baseten. To deploy the Chain, run: ```sh truss chains push rag.py ``` This will deploy our Chain as a development deployment. Once the Chain is deployed, we can call it from its API endpoint. You can do this in the console with cURL: ```sh curl -X POST 'https://chain-5wo86nn3.api.baseten.co/development/run_remote' \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '{"new_bio": "Sam just moved to Manhattan for his new job at a large bank.In college, he enjoyed building sets for student plays."}' ``` Alternatively, you can also integrate this in a Python application: ```python call_chain.py import requests import os # Insert the URL from the deployed rag chain. You can get it from the CLI # output or the status page, e.g. # "https://chain-6wgeygoq.api.baseten.co/production/run_remote". RAG_CHAIN_URL = "" baseten_api_key = os.environ["BASETEN_API_KEY"] if not RAG_CHAIN_URL: raise ValueError("Please insert the URL for the RAG chain.") resp = requests.post( RAG_CHAIN_URL, headers={"Authorization": f"Api-Key {baseten_api_key}"}, json={"new_bio": new_bio}, ) print(resp.json()) ``` When we're happy with the deployed Chain, we can promote it to production via the UI or by running: ```sh truss chains push --promote rag.py ``` Once in production, the Chain will have access to full autoscaling settings. Both the development and production deployments will scale to zero when not in use. # Build your first Chain Build and deploy two example Chains Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). This quickstart guide contains instructions for creating two Chains: 1. A simple CPU-only “hello world”-Chain. 2. A Chain that implements Phi-3 Mini and uses it to write poems. ## Prerequisites To use Chains, install a recent Truss version and ensure pydantic is v2: ```bash pip install --upgrade truss 'pydantic>=2.0.0' ``` Truss requires python `>=3.8,<3.13`. To set up a fresh development environment, you can use the following commands, creating a environment named `chains_env` using `pyenv`: ```bash curl https://pyenv.run | bash echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc echo '[[ -d $PYENV_ROOT/bin ]] && export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc echo 'eval "$(pyenv init -)"' >> ~/.bashrc source ~/.bashrc pyenv install 3.11.0 ENV_NAME="chains_env" pyenv virtualenv 3.11.0 $ENV_NAME pyenv activate $ENV_NAME pip install --upgrade truss 'pydantic>=2.0.0' ``` To deploy Chains remotely, you also need a [Baseten account](https://app.baseten.co/signup). It is handy to export your API key to the current shell session or permanently in your `.bashrc`: ```bash ~/.bashrc export BASETEN_API_KEY="nPh8..." ``` ## Example: Hello World Chains are written in Python files. In your working directory, create `hello_chain/hello.py`: ```sh mkdir hello_chain cd hello_chain touch hello.py ``` In the file, we'll specify a basic Chain. It has two Chainlets: * `HelloWorld`, the entrypoint, which handles the input and output. * `RandInt`, which generates a random integer. It is used a as a dependency by `HelloWorld`. Via the entrypoint, the Chain takes a maximum value and returns the string " Hello World!" repeated a variable number of times. ```python hello.py import random import truss_chains as chains class RandInt(chains.ChainletBase): def run_remote(self, max_value: int) -> int: return random.randint(1, max_value) @chains.mark_entrypoint class HelloWorld(chains.ChainletBase): def __init__(self, rand_int=chains.depends(RandInt, retries=3)) -> None: self._rand_int = rand_int def run_remote(self, max_value: int) -> str: num_repetitions = self._rand_int.run_remote(max_value) return "Hello World! " * num_repetitions ``` ### The Chainlet class-contract Exactly one Chainlet must be marked as the entrypoint with the [`@chains.mark_entrypoint`](/chains-reference/sdk#truss-chains-mark-entrypoint) decorator. This Chainlet is responsible for handling public-facing input and output for the whole Chain in response to an API call. A Chainlet class has a single public method, [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets), which is the API endpoint for the entrypoint Chainlet and the function that other Chainlets can use as a dependency. The [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) method must be fully type-annotated with primitive python types or [pydantic models](https://docs.pydantic.dev/latest/). Chainlets cannot be naively instantiated. The only correct usages are: 1. Make one Chainlet depend on another one via the [`chains.depends()`](/chains-reference/sdk#truss-chains-depends) directive as an `__init__`-argument as shown above for the `RandInt` Chainlet. 2. In the [local debugging mode](/chains/guide#test-a-chain-locally). Beyond that, you can structure your code as you like, with private methods, imports from other files, and so forth. Keep in mind that Chainlets are intended for distributed, replicated, remote execution, so using global variables, global state, and certain Python features like importing modules dynamically at runtime should be avoided as they may not work as intended. ### Deploy your Chain to Baseten To deploy your Chain to Baseten, run: ```bash truss chains push hello.py ``` The deploy command results in an output like this: ``` ⛓️ HelloWorld - Chainlets ⛓️ ╭──────────────────────┬─────────────────────────┬─────────────╮ │ Status │ Name │ Logs URL │ ├──────────────────────┼─────────────────────────┼─────────────┤ │ 💚 ACTIVE │ HelloWorld (entrypoint) │ https://... │ ├──────────────────────┼─────────────────────────┼─────────────┤ │ 💚 ACTIVE │ RandInt (dep) │ https://... │ ╰──────────────────────┴─────────────────────────┴─────────────╯ Deployment succeeded. You can run the chain with: curl -X POST 'https://chain-.../run_remote' \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '' ``` Wait for the status to turn to `ACTIVE` and test invoking your Chain (replace `$INVOCATION_URL` in below command): ```bash curl -X POST $INVOCATION_URL \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '{"max_value": 10}' # "Hello World! Hello World! Hello World! " ``` ## Example: Poetry with LLMs Our second example also has two Chainlets, but is somewhat more complex and realistic. The Chainlets are: * `PoemGenerator`, the entrypoint, which handles the input and output and orchestrates calls to the LLM. * `PhiLLM`, which runs inference on Phi-3 Mini. This Chain takes a list of words and returns a poem about each word, written by Phi-3. Here's the architecture: We build this Chain in a new working directory (if you are still inside `hello_chain/`, go up one level with `cd ..` first): ```sh mkdir poetry_chain cd poetry_chain touch poems.py ``` A similar ent-to-end code example, using Mistral as an LLM, is available in the [examples repo](https://github.com/basetenlabs/truss/tree/main/truss-chains/examples/mistral). ### Building the LLM Chainlet The main difference between this Chain and the previous one is that we now have an LLM that needs a GPU and more complex dependencies. Copy the following code into `poems.py`: ```python poems.py import asyncio from typing import List import pydantic import truss_chains as chains from truss import truss_config PHI_HF_MODEL = "microsoft/Phi-3-mini-4k-instruct" # This configures to cache model weights from the hunggingface repo # in the docker image that is used for deploying the Chainlet. PHI_CACHE = truss_config.ModelRepo( repo_id=PHI_HF_MODEL, allow_patterns=["*.json", "*.safetensors", ".model"] ) class Messages(pydantic.BaseModel): messages: List[dict[str, str]] class PhiLLM(chains.ChainletBase): # `remote_config` defines the resources required for this chainlet. remote_config = chains.RemoteConfig( docker_image=chains.DockerImage( # The phi model needs some extra python packages. pip_requirements=[ "accelerate==0.30.1", "einops==0.8.0", "transformers==4.41.2", "torch==2.3.0", ] ), # The phi model needs a GPU and more CPUs. compute=chains.Compute(cpu_count=2, gpu="T4"), # Cache the model weights in the image assets=chains.Assets(cached=[PHI_CACHE]), ) def __init__(self) -> None: # Note the imports of the *specific* python requirements are # pushed down to here. This code will only be executed on the # remotely deployed Chainlet, not in the local environment, # so we don't need to install these packages in the local # dev environment. import torch import transformers self._model = transformers.AutoModelForCausalLM.from_pretrained( PHI_HF_MODEL, torch_dtype=torch.float16, device_map="auto", ) self._tokenizer = transformers.AutoTokenizer.from_pretrained( PHI_HF_MODEL, ) self._generate_args = { "max_new_tokens": 512, "temperature": 1.0, "top_p": 0.95, "top_k": 50, "repetition_penalty": 1.0, "no_repeat_ngram_size": 0, "use_cache": True, "do_sample": True, "eos_token_id": self._tokenizer.eos_token_id, "pad_token_id": self._tokenizer.pad_token_id, } async def run_remote(self, messages: Messages) -> str: import torch model_inputs = self._tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = self._tokenizer(model_inputs, return_tensors="pt") input_ids = inputs["input_ids"].to("cuda") with torch.no_grad(): outputs = self._model.generate(input_ids=input_ids, **self._generate_args) output_text = self._tokenizer.decode(outputs[0], skip_special_tokens=True) return output_text ``` ### Building the entrypoint Now that we have an LLM, we can use it in a poem generator Chainlet. Add the following code to `poems.py`: ```python poems.py @chains.mark_entrypoint class PoemGenerator(chains.ChainletBase): def __init__(self, phi_llm: PhiLLM = chains.depends(PhiLLM)) -> None: self._phi_llm = phi_llm async def run_remote(self, words: list[str]) -> list[str]: tasks = [] for word in words: messages = Messages( messages=[ { "role": "system", "content": ( "You are poet who writes short, " "lighthearted, amusing poetry." ), }, {"role": "user", "content": f"Write a poem about {word}"}, ] ) tasks.append(asyncio.ensure_future(self._phi_llm.run_remote(messages))) return list(await asyncio.gather(*tasks)) ``` Note that we use `asyncio.ensure_future` around each RPC to the LLM chainlet. This makes the current python process start these remote calls concurrently, i.e. the next call is started before the previous one has finished and we can minimize our overall runtime. In order to await the results of all calls, `asyncio.gather` is used which gives us back normal python objects. If the LLM is hit with many concurrent requests, it can auto-scale up (if autoscaling is configure). More advanced LLM models have batching capabilities, so for those even a single instance can serve concurrent request. ### Deploy your Chain to Baseten To deploy your Chain to Baseten, run: ```bash truss chains push poems.py ``` Wait for the status to turn to `ACTIVE` and test invoking your Chain (replace `$INVOCATION_URL` in below command): ```bash curl -X POST $INVOCATION_URL \ -H "Authorization: Api-Key $BASETEN_API_KEY" \ -d '{"words": ["bird", "plane", "superman"]}' #[[ #" [INST] Generate a poem about: bird [/INST] In the quiet hush of...", #" [INST] Generate a poem about: plane [/INST] In the vast, boudl...", #" [INST] Generate a poem about: superman [/INST] In the realm where..." #]] ``` # User Guides Using the full potential of Chains Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). ## Designing the architecture of a Chain A Chain is composed of multiple connecting Chainlets working together to perform a task. For example, the Chain in the diagram below takes a large audio file. Then it splits it into smaller chunks, transcribes each chunk in parallel to speed up the transcription process, and finally aggregates and returns the results. To build an efficient end-to-end Chain, we recommend drafting your high level structure as a flowchart or diagram. This will help you identify the Chainlets needed and how to link them. If one Chainlet creates many "sub-tasks" by calling other dependency Chainlets (e.g. in a loop over partial work items), these calls should be done as `aynscio`-tasks that run concurrently. That way you get the most out of the parallelism that Chains offers. This design pattern is extensively used in the [audio transcription example](/chains/examples/audio-transcription). ## Local development Chains are designed for production in replicated remote deployments. But alongside that production-ready power, we need great local development and deployment experiences. Chains exists to help you build multi-step, multi-model pipelines. The abstractions that Chains introduces are based on six opinionated principles: three for architecture and three for developer experience. **Architecture principles** Each step in the pipeline can set its own hardware requirements and software dependencies, separating GPU and CPU workloads. Each component has independent autoscaling parameters for targeted resource allocation, removing bottlenecks from your pipelines. Components specify a single public interface for flexible-but-safe composition and are reusable between projects **Developer experience principles** Eliminate entire taxonomies of bugs by writing typed Python code and validating inputs, outputs, module initializations, function signatures, and even remote server configurations. Seamless local testing and cloud deployments: test Chains locally with support for mocking the output of any step and simplify your cloud deployment loops by separating large model deployments from quick updates to glue code. Use Chains to orchestrate existing model deployments, like pre-packaged models from Baseten’s model library, alongside new model pipelines built entirely within Chains. Locally, a Chain is just Python files in a source tree. While that gives you a lot of flexibility in how you structure your code, there are some constraints and rules to follow to ensure successful distributed, remote execution in production. The best thing you can do while developing locally with Chains is torun your code frequently, even if you do not have a `__main__` section: the Chains framework runs various validations at module initialization to help you catch issues early. Additionally, running `mypy` and fixing reported type errors can help you find problems early and in a rapid feedback loop, before attempting a (much slower) deployment. Complementary to the purely local development Chains also has a "watch" mode, like Truss, see the [watch section below](#Watch). ### Test a Chain locally Let's revisit our "Hello World" Chain: ```python hello_chain/hello.py import asyncio import truss_chains as chains # This Chainlet does the work class SayHello(chains.ChainletBase): async def run_remote(self, name: str) -> str: return f"Hello, {name}" # This Chainlet orchestrates the work @chains.mark_entrypoint class HelloAll(chains.ChainletBase): def __init__(self, say_hello_chainlet=chains.depends(SayHello)) -> None: self._say_hello = say_hello_chainlet async def run_remote(self, names: list[str]) -> str: tasks = [] for name in names: tasks.append(asyncio.ensure_future( self._say_hello.run_remote(name))) return "\n".join(await asyncio.gather(*tasks)) # Test the Chain locally if __name__ == "__main__": with chains.run_local(): hello_chain = HelloAll() result = asyncio.get_event_loop().run_until_complete( hello_chain.run_remote(["Marius", "Sid", "Bola"])) print(result) ``` When the `__main__()` module is run, local instances of the Chainlets are created, allowing you to test functionality of your chain just by executing the Python file: ```bash cd hello_chain python hello.py # Hello, Marius # Hello, Sid # Hello, Bola ``` ### Mock execution of GPU Chainlets Using `run_local()` to run your code locally requires that your development environment have the compute resources and dependencies that each Chainlet needs. But that often isn't possible when building with AI models. Chains offers a workaround, mocking, to let you test the coordination and business logic of your multi-step inference pipeline without worrying about running the model locally. The second example in the [getting started guide](/chains/getting-started) implements a Truss Chain for generating poems with Phi-3. This Chain has two Chainlets: 1. The `PhiLLM` Chainlet, which requires an NVIDIA A10G GPU. 2. The `PoemGenerator` Chainlet, which easily runs on a CPU. If you have an NVIDIA T4 under your desk, good for you. For the rest of us, we can mock the `PhiLLM` Chainlet that is infeasible to run locally so that we can quickly test the `PoemGenerator` Chainlet. To do this, we define a mock Phi-3 model in our `__main__` module and give it a [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) method that produces a test output that matches the output type we expect from the real Chainlet. Then, we inject an instance of this mock Chainlet into our Chain: ```python poems.py if __name__ == "__main__": class FakePhiLLM: def run_remote(self, prompt: str) -> str: return f"Here's a poem about {prompt.split(" ")[-1]}" with chains.run_local(): poem_generator = PoemGenerator(phi_llm=FakePhiLLM()) result = poem_generator.run_remote(words=["bird", "plane", "superman"]) print(result) ``` And run your Python file: ```bash python poems.py # ['Here's a poem about bird', 'Here's a poem about plane', 'Here's a poem about superman'] ``` You may notice that the argument `phi_llm` expects a type `PhiLLM`, while we are passing it an instance of `FakePhiLLM`. These aren't the same, which should be a type error. However, this works at runtime because we constructed `FakePhiLLM` to use the same protocol as the real thing. We can make this explicit by defining a `Protocol` as a type annotation: ```python from typing import Protocol class PhiProtocol(Protocol): def run_remote(self, data: str) -> str: ... ``` and changing the argument type in `PoemGenerator`: ```python @chains.mark_entrypoint class PoemGenerator(chains.ChainletBase): def __init__(self, phi_llm: PhiProtocol = chains.depends(PhiLLM)) -> None: self._phi_llm = phi_llm ``` This resolves the apparent type error. ## Chains Watch The [watch command](/chains-reference/cli#watch) (`truss chains watch`) combines the best of local development and full deployment. `watch` lets you run on an exact copy of the production hardware and interface but gives you live reload that lets you test changes in seconds without creating a new deployment. To use `truss chains watch`: 1. Push a chain in development mode (i.e. `publish` and `promote` flags are false). 2. Run the watch command `truss chains watch SOURCE`. You can also add the `watch` option to the `push` command and combine both to a single step. 3. Each time you edit a file and save the changes, the watcher patches the remote deployments. Updating the deployments might take a moment, but is generally *much* faster than creating a new deployment. 4. You can call the chain with test data via `cURL` or the call dialogue in the UI and observe the result and logs. 5. Iterate steps 3. and 4. until your chain behaves in the desired way. ## Deploy a Chain Deploying a Chain is an atomic action that deploys every Chainlet within the chain separately. Each Chainlet specifies its own remote environment — hardware resources, Python and system dependencies, autoscaling settings. ### Development To deploy a Chain as a development deployment, run: ```sh truss chains push ./my_chain.py ``` Where `my_chain.py` contains the entrypoint Chainlet for your Chain. Development deployments are intended for testing and can't scale past one replica. Each time you make a development deployment, it overwrites the existing development deployment. Development deployments support rapid iteration with `watch` - see [above guide](#chains-watch). ### 🆕 Environments To deploy a Chain to an environment, run: ```sh truss chains push ./my_chain.py --environment {env_name} ``` Environments are intended for live traffic and have access to full autoscaling settings. Each time you deploy to an environment, a new deployment is created. Once the new deployment is live, it replaces the previous deployment, which is relegated to the published deployments list. [Learn more](/deploy/lifecycle#what-is-an-environment) about environments. ## Call a Chain's API endpoint Once your Chain is deployed, you can call it via its API endpoint. Chains use the same inference API as models: * [Development endpoint](/api-reference/development-run-remote) * [Production endpoint](/api-reference/production-run-remote) * [🆕 Environment endpoint](/api-reference/environments-run-remote) * [Endpoint by ID](/api-reference/deployment-run-remote) Here's an example which calls the development deployment: ```python call_chain.py import requests import os # From the Chain overview page on Baseten # E.g. "https://chain-.api.baseten.co/development/run_remote" CHAIN_URL = "" baseten_api_key = os.environ["BASETEN_API_KEY"] # JSON keys and types match the `run_remote` method signature. data = {...} resp = requests.post( CHAIN_URL, headers={"Authorization": f"Api-Key {baseten_api_key}"}, json=data, ) print(resp.json()) ``` ### How to pass chain input The data schema of the inference request corresponds to the function signature of [`run_remote()`](/chains/concepts#run-remote-chaining-chainlets) in your entrypoint Chainlet. For example, for the Hello Chain, `HelloAll.run_remote()`: ```python def run_remote(self, names: list[str]) -> str: ``` You'd pass the following JSON payload: ```json {"names": ["Marius", "Sid", "Bola"]} ``` I.e. the keys in the JSON record, match the argument names and types of `run_remote.` ### Async chain inference Like Truss models, Chains support async invocation. The [guide for models](/invoke/async) applies largely - in particular for how to wrap the input and set up the webhook to process results. The following additional points are chains specific: * Use chain-based URLS: * `https://chain-{chain}.api.baseten.co/production/async_run_remote` * `https://chain-{chain}.api.baseten.co/development/async_run_remote` * `https://chain-{chain}.api.baseten.co/deployment/{deployment}/async_run_remote`. * `https://chain-{chain}.api.baseten.co/environments/{env_name}/async_run_remote`. * Only the entrypoint is invoked asynchronously. Internal Chainlet-Chainlet calls are still run synchronously. ## Subclassing for code reuse Sometimes you want to write one "main" implementation of a complicated inference task, but then re-use it for similar variations. For example: * Deploy it on different hardware and with different concurrency. * Replace a dependency (e.g. silence detection in audio files) with a different implementation of that step - while keeping all other processing the same. * Deploy the same inference flow, but exchange the model weights used. E.g. for a large and small version of an LLM or different model weights fine-tuned to\ domains. * Add an adapter to convert between a different input/output schema. In all of those cases, you can create lightweight subclasses of your main chainlet. Below are some example code snippets - they can all be combined with each other! ### Example base class ```python import truss_chains as chains class Preprocess2x(chains.ChainletBase): def run_remote(self, number: int) -> int: return 2 * number class MyBaseChainlet(chains.ChainletBase): remote_config = chains.RemoteConfig( compute=chains.Compute(cpu_count=1, memory="100Mi"), options=chains.ChainletOptions(enable_b10_tracing=True), ) def __init__(self, preprocess=chains.depends(Preprocess2x)): self._preprocess = preprocess def run_remote(self, number: int) -> float: return 1.0 / self._preprocess.run_remote(number) # Assert base behavior. with chains.run_local(): chainlet = MyBaseChainlet() assert chainlet.run_remote(4) == 1 / (4 * 2) ``` ### Adapter for different I/O The base class `MyBaseChainlet` works with integer inputs and returns floats. If you want to reuse the computation, but provide an alternative interface (e.g. for a different client with different request/response schema), you can create a subclass which does the I/O conversion. The actual computation is delegated to the base classes above. ```python class ChainletStringIO(MyBaseChainlet): def run_remote(self, number: str) -> str: return str(super().run_remote(int(number))) # Assert new behavior. with chains.run_local(): chainlet_string_io = ChainletStringIO() assert chainlet_string_io.run_remote("4") == "0.125" ``` ### Chain with substituted dependency The base class `MyBaseChainlet` uses preprocessing that doubles the input. If you want to use a different variant of preprocessing - while keeping `MyBaseChainlet.run_remote` and everything else as is - you can define a shallow subclass of `MyBaseChainlet` where you use a different dependency `Preprocess8x`, which multiplies by 8 instead of 2. ```python class Preprocess8x(chains.ChainletBase): def run_remote(self, number: int) -> int: return 8 * number class Chainlet8xPreprocess(MyBaseChainlet): def __init__(self, preprocess=chains.depends(Preprocess8x)): super().__init__(preprocess=preprocess) # Assert new behavior. with chains.run_local(): chainlet_8x_preprocess = Chainlet8xPreprocess() assert chainlet_8x_preprocess.run_remote(4) == 1 / (4 * 8) ``` ### Override remote config. If you want to re-deploy a chain, but change some deployment options, e.g. run on different hardware, you can create a subclass and override `remote_config`. ```python class Chainlet16Core(MyBaseChainlet): remote_config = chains.RemoteConfig( compute=chains.Compute(cpu_count=16, memory="100Mi"), options=chains.ChainletOptions(enable_b10_tracing=True), ) ``` Be aware that `remote_config` is a class variable. In the example above we created a completely new `RemoteConfig` value, because changing fields *inplace* would also affect the base class. If you want to share config between the base class and subclasses, you can define them in additional variables e.g. for the image: ```python DOCKER_IMAGE = chains.DockerImage(pip_requirements=[...], ...) class MyBaseChainlet(chains.ChainletBase): remote_config = chains.RemoteConfig(docker_image=DOCKER_IMAGE, ...) class Chainlet16Core(MyBaseChainlet): remote_config = chains.RemoteConfig(docker_image=DOCKER_IMAGE, ...) ``` # Overview Chains: A new DX for deploying multi-component ML workflows Chains is in beta mode. Read our [launch blog post](https://www.baseten.co/blog/introducing-baseten-chains/). Chains is a framework for building robust, performant multi-step and multi-model inference pipelines and deploying them to production. It addresses the common challenges of managing latency, cost and dependencies for complex workflows, while leveraging Truss’ existing battle-tested performance, reliability and developer toolkit.