Accessing logs programmatically is an experimental feature powered by the same GraphQL API used within the Baseten application. The code snippet below should serve as a starting point. Please take care to uphold a reasonable rate limit when using this feature.
This feature is provided “as-is” and may be discontinued or change in a non backwards compatible way in the future.
import json
import logging
import time
from datetime import datetime, timedelta, timezone
# NOTE: THIS NEEDS TO BE PIP INSTALLED
import requests
# SET YOUR PARAMETERS HERE (UTC)
start_date = datetime.fromisoformat("2023-06-12T16:00:00+00:00")
end_date = datetime.fromisoformat("2023-06-12T17:00:00+00:00")
entity_params = {
"type": "MODEL",
"subtype": "EXECUTION",
"entity_id": "", # Provide deployment ID here
}
api_key = "" # TODO: provide your API key here
batch_size = 500
output_file = f'model-{time.strftime("%Y%m%d-%H%M%S")}.logs'
# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
def get_logs_in_range(start_date, end_date):
request_payload = {
**entity_params,
"start": int(start_date.timestamp() * 1000),
"end": int(end_date.timestamp() * 1000),
"limit": batch_size,
"direction": "forward",
"levels": [],
}
headers = {
"Authorization": f"Api-Key {api_key}",
"Content-Type": "application/json",
}
response = requests.post(
"https://app.baseten.co/logs", data=json.dumps(request_payload), headers=headers
)
response.raise_for_status()
json_response = response.json()
if not json_response["success"]:
raise Exception("An error occurred: %s" % json_response["error"])
return json_response["logs"]
def parse_ts(ts):
return datetime.fromtimestamp(int(ts) // 10**9, tz=timezone.utc)
with open(output_file, "w") as f:
logger.info(f"Writing logs to {output_file}")
cursor = start_date
last_ts = None
while cursor < end_date:
logs = get_logs_in_range(cursor, end_date)
logger.info("Loaded %s logs from cursor %s", len(logs), cursor.isoformat())
has_more = len(logs) == batch_size
for log in logs:
if last_ts and int(log["ts"]) <= last_ts:
# We expect some duplication between pages, so skip any logs we've already seen
continue
ts = parse_ts(log["ts"])
f.write(ts.isoformat() + " " + log["msg"] + "\n")
if has_more:
last_ts = int(logs[-1]["ts"])
cursor = parse_ts(last_ts).replace(microsecond=0)
else:
cursor = end_date