Accessing logs programmatically is an experimental feature powered by the same GraphQL API used within the Baseten application. The code snippet below should serve as a starting point. Please take care to uphold a reasonable rate limit when using this feature.

This feature is provided “as-is” and may be discontinued or change in a non backwards compatible way in the future.
import json
import logging
import time
from datetime import datetime, timedelta, timezone

# NOTE: THIS NEEDS TO BE PIP INSTALLED
import requests


# SET YOUR PARAMETERS HERE (UTC)
start_date = datetime.fromisoformat("2023-06-12T16:00:00+00:00")
end_date = datetime.fromisoformat("2023-06-12T17:00:00+00:00")

entity_params = {
    "type": "MODEL",
    "subtype": "EXECUTION",
    "entity_id": "",  # Provide deployment ID here
}

api_key = "" # TODO: provide your API key here
batch_size = 500

output_file = f'model-{time.strftime("%Y%m%d-%H%M%S")}.logs'


# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)


def get_logs_in_range(start_date, end_date):
    request_payload = {
        **entity_params,
        "start": int(start_date.timestamp() * 1000),
        "end": int(end_date.timestamp() * 1000),
        "limit": batch_size,
        "direction": "forward",
        "levels": [],
    }

    headers = {
        "Authorization": f"Api-Key {api_key}",
        "Content-Type": "application/json",
    }
    
    response = requests.post(
        "https://app.baseten.co/logs", data=json.dumps(request_payload), headers=headers
    )

    response.raise_for_status()
    json_response = response.json()
    if not json_response["success"]:
        raise Exception("An error occurred: %s" % json_response["error"])

    return json_response["logs"]


def parse_ts(ts):
    return datetime.fromtimestamp(int(ts) // 10**9, tz=timezone.utc)


with open(output_file, "w") as f:
    logger.info(f"Writing logs to {output_file}")
    cursor = start_date
    last_ts = None

    while cursor < end_date:
        logs = get_logs_in_range(cursor, end_date)

        logger.info("Loaded %s logs from cursor %s", len(logs), cursor.isoformat())

        has_more = len(logs) == batch_size

        for log in logs:
            if last_ts and int(log["ts"]) <= last_ts:
                # We expect some duplication between pages, so skip any logs we've already seen
                continue
            ts = parse_ts(log["ts"])
            f.write(ts.isoformat() + " " + log["msg"] + "\n")

        if has_more:
            last_ts = int(logs[-1]["ts"])
            cursor = parse_ts(last_ts).replace(microsecond=0)
        else:
            cursor = end_date