Skip to main content

EventStream reference

EventStream opens a Server-Sent Events connection to a session and dispatches parsed, validated events to your callback. It handles reconnection, sequence tracking, and resumption automatically.

Source

Defined in packages/sdk/src/stream.ts. Re-exported from @ethosagent/sdk.

Quick start

import { EventStream } from '@ethosagent/sdk';

const sub = EventStream({
baseUrl: 'http://localhost:2400',
apiKey: 'esk_...',
sessionId: 'ses_abc123',
onEvent(event, seq) {
if (event.type === 'text_delta') process.stdout.write(event.text);
if (event.type === 'done') console.log('\n--- turn complete ---');
},
onError(err) {
console.error('Stream error:', err);
},
});

// Later:
sub.close();

EventStreamOptions

FieldTypeRequiredDescription
baseUrlstringyesOrigin of the Ethos web-api server.
apiKeystringnoBearer token. Omit for cookie-auth mode (sends credentials: 'include').
sessionIdstringyesSession to subscribe to.
sinceSeqnumbernoResume from this sequence number. Events with id <= sinceSeq are skipped server-side.
signalAbortSignalnoExternal abort signal. When aborted, the stream closes cleanly.
onEvent(event: SseEvent, seq: number) => voidyesCalled for every validated event.
onError(err: unknown) => voidnoCalled on connection failures and parse errors.

EventStreamSubscription

The return value of EventStream().

Property / MethodTypeDescription
close()() => voidAbort the connection. Sets closed to true.
lastSeqnumber (readonly)Sequence number of the last successfully processed event.
closedboolean (readonly)true after close() is called or the connection is permanently lost.

Reconnection

When the connection drops (network error, server restart), EventStream waits 3 seconds and retries automatically. It passes the lastEventId query parameter so the server can resume from where the client left off. The loop continues until close() is called or the provided signal is aborted.

SSE endpoint

The stream connects to:

GET <baseUrl>/sse/sessions/<sessionId>?lastEventId=<sinceSeq>

Each SSE frame has an id: line (monotonic integer sequence) and a data: line (JSON matching SseEventSchema).

SSE event types

Events are a discriminated union on the type field. They fall into two families.

Per-turn events

These fire during an active agent turn and mirror AgentEvent from @ethosagent/core.

TypeKey fieldsDescription
text_deltatextIncremental text token from the LLM.
thinking_deltathinkingExtended-thinking token (when enabled).
tool_starttoolCallId, toolName, argsTool execution began.
tool_progresstoolName, message, percent?, audienceProgress update from a running tool.
tool_endtoolCallId, toolName, ok, durationMs, result?Tool execution finished.
usageinputTokens, outputTokens, estimatedCostUsdToken usage for the current API call.
context_metadataArbitrary metadata attached to the turn context.
donetext, turnCountTurn completed. text is the full assistant response.
errorerror, codeTurn failed with an error.
message_persistedmessageId, roleA message was persisted to the session store.

Push events

These arrive regardless of whether a turn is active. They notify the client of system-wide state changes.

TypeKey fieldsDescription
tool.approval_requiredrequest (ApprovalRequest)A tool call needs user approval before proceeding.
approval.resolvedapprovalId, decision, decidedByAn approval was resolved (possibly by another tab).
clarify.requestrequestId, question, options?, default?, defaultDeadlineAtThe agent asked a clarification question mid-turn.
clarify.resolvedrequestId, sourceA clarification was answered or timed out.
cron.firedjobId, ranAt, outputPath?A cron job completed a run.
mesh.changedagentsThe agent mesh topology changed.
evolve.skill_pendingskillId, personalityId?, proposedAtThe skill evolver proposed a new or rewritten skill.
protocol.upgrade_requiredserverVersion, clientVersionExpectedThe server requires a newer client version.

Narrowing events

The SseEvent type is a Zod discriminated union. Use event.type in a switch or if-check to narrow:

onEvent(event) {
switch (event.type) {
case 'text_delta':
// event is { type: 'text_delta'; text: string }
break;
case 'tool_start':
// event is { type: 'tool_start'; toolCallId: string; toolName: string; args: unknown }
break;
}
}