SegOps AIDocs

Python SDK

Thread-safe event client with built-in batching, the public-key session handshake, and graceful shutdown. Zero required runtime dependencies (stdlib only).

Installation#

bash
pip install segops

Requires Python 3.9+.

Quick start#

python
import segops

client = segops.SegOpsClient("https://api.segops.ai", "sk_...")

client.track(
    user_id="user-123",
    event_type="order_placed",
    payload={"order_id": "ord-456", "total": 89.95},
)

client.shutdown()  # flush + stop the background thread on exit

Or use it as a context manager (flushes on exit):

python
with segops.SegOpsClient("https://api.segops.ai", "sk_...") as client:
    client.track(user_id="user-123", event_type="page_viewed", payload={"path": "/home"})

Which key do I use?#

Where the code runsKeyWhat to pass
Your server (Django, Flask, scripts)sk_… secretapi_key only
A client you can't trust with a secretpk_… publicapi_key and get_user — the SDK runs the session handshake automatically

Never put an sk_ key anywhere a user can read it. Python is primarily a server-side language, so sk_ is the common case; pk_ is supported for parity.

When the key starts with pk_, the SDK calls /api/auth/session/ on first use, caches the short-lived token, refreshes it 60s before expiry, and re-mints on a 401. If the key requires a signed user_id, produce the signature on your backend with segops.sign_user_id and pass the fields through get_user.

python
client = segops.SegOpsClient(
    "https://api.segops.ai",
    "pk_...",
    get_user=lambda: segops.UserContext(
        user_id=current_user.id,
        user_id_sig=signed["user_id_sig"],  # from your backend
        user_id_ts=signed["user_id_ts"],
    ),
)

Constructor#

python
SegOpsClient(
    api_url,
    api_key,
    *,
    get_user=None,
    batch=True,
    batch_size=20,
    flush_interval=5.0,
    on_error=None,
)
ArgumentDefaultDescription
api_urlBase URL of your SegOps deployment
api_keyAPI key — sk_… (server) or pk_… (handshake)
get_userRequired for pk_…: returns a UserContext at mint time
batchTrueBuffer events and flush automatically; False sends each immediately
batch_size20Max events before an automatic flush
flush_interval5.0Periodic flush interval (seconds)
on_errorlog to stderrCalled when a background flush fails

Methods#

track(event=None, *, user_id, event_type, occurred_at=None, payload=None)#

Enqueue an event. Accepts an Event (or dict) or keyword arguments. Non-blocking and safe for concurrent use.

python
client.track(user_id="user-123", event_type="page_viewed", payload={"path": "/home"})
# OccurredAt defaults to the current UTC time when omitted.

# Or with a dataclass:
client.track(segops.Event(user_id="user-123", event_type="page_viewed"))

identify(context=None, *, user_id, traits=None)#

Record user traits as a context_identified event.

python
client.identify(
    user_id="user-123",
    traits={"email": "[email protected]", "plan": "starter", "company": "Acme Corp"},
)

flush()#

Flush all buffered events immediately. Blocks until the HTTP request completes.

shutdown() / close()#

Stop the background thread and flush any remaining events. Always call this before the process exits to avoid data loss. close() is an alias, and the client is a context manager that flushes on exit.

reset_session()#

Drop the cached session token so the next event re-mints with the current get_user() identity. Call after a login/logout. No-op for sk_ clients.

Using it with a web framework#

SegOpsClient is plain, framework-agnostic Python (zero runtime dependencies), so it works anywhere — Django, FastAPI, Flask, Starlette, FastMCP, Celery workers, or a one-off script. Create one client per process and reuse it.

The Claude Code skills wire it into common stacks for you (integrate-django, integrate-fastapi, integrate-flask, integrate-node for Node) — see Install with Claude Code.

Sync frameworks (Flask, Django, WSGI)#

Call the client directly and flush on shutdown:

python
import atexit
from segops import SegOpsClient

client = SegOpsClient(os.environ["SEGOPS_API_URL"], os.environ["SEGOPS_SECRET_KEY"])
atexit.register(client.shutdown)   # flush on a clean worker shutdown

Async frameworks (FastAPI, Starlette, async FastMCP tools)#

The client is synchronous with a background flush thread. track() is normally just a queue append, but every batch_size-th call triggers an inline flush, and flush()/shutdown() always block — so in an async def handler, offload to a threadpool to keep the event loop free:

python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
from segops import SegOpsClient

client = SegOpsClient(os.environ["SEGOPS_API_URL"], os.environ["SEGOPS_SECRET_KEY"])

@asynccontextmanager
async def lifespan(app: FastAPI):
    yield
    await run_in_threadpool(client.shutdown)   # flush on shutdown, off-loop

app = FastAPI(lifespan=lifespan)

@app.post("/checkout")
async def checkout(...):
    await run_in_threadpool(client.track, user_id=uid, event_type="checkout_completed")

(Calling client.track(...) from a sync def endpoint is also fine — FastAPI runs those in a threadpool for you.)

Types#

python
@dataclass
class Event:
    user_id: str          # Required
    event_type: str       # Required; snake_case
    occurred_at: str | None = None  # Optional; ISO-8601. Defaults to now (UTC).
    payload: dict = {}    # Optional

@dataclass
class Context:
    user_id: str
    traits: dict = {}

Signing a user id (backend only)#

Run this on your backend only — never ship the HMAC secret to a client.

python
import os
import segops

signed = segops.sign_user_id(os.environ["SEGOPS_HMAC_SECRET"], current_user.id)
# -> {"user_id": ..., "user_id_sig": ..., "user_id_ts": ...} — hand to get_user()

See Server-side Signing for the full reference. Enable "require signed user_id" on the public key so the server rejects unsigned or forged identities.

Error handling#

The background flush thread reports errors via the on_error callback (default: log to stderr). flush() and shutdown() raise on a non-2xx response so you can handle them directly. Events are never dropped on shutdown — the client guarantees a final flush.