Durable Streams
Streaming storage for large exports, audit logs, and real-time data
Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a write-once, read-many pattern: once data is written and the stream is closed, the content is immutable and can be accessed via a permanent public URL.
When to Use Durable Streams
| Storage Type | Best For |
|---|---|
| Durable Streams | Large exports, audit logs, streaming data, batch processing |
| Key-Value | Fast lookups, caching, configuration |
| Vector | Semantic search, embeddings, RAG |
| Object (S3) | Files, images, documents, media |
| Database | Structured data, complex queries, transactions |
Use streams when you need to:
- Export large datasets incrementally
- Create audit logs while streaming to clients
- Process data in chunks without holding everything in memory
- Return a URL immediately while data writes in the background
Creating Streams
import { createAgent } from '@agentuity/runtime';
const agent = createAgent('StreamCreator', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
compress: true, // optional gzip compression
metadata: { userId: input.userId },
});
// Stream is ready immediately
ctx.logger.info('Stream created', {
id: stream.id,
url: stream.url,
});
return { streamId: stream.id, streamUrl: stream.url };
},
});Options:
contentType: MIME type for the stream content (e.g.,text/csv,application/json)compress: Enable gzip compression for smaller storage and faster transfersmetadata: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)
Writing Data
Write data incrementally, then close the stream:
const agent = createAgent('CSVExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('export', {
contentType: 'text/csv',
});
// Write header
await stream.write('Name,Email,Created\n');
// Write rows
for (const user of input.users) {
await stream.write(`${user.name},${user.email},${user.created}\n`);
}
// Close when done
await stream.close();
ctx.logger.info('Export complete', {
bytesWritten: stream.bytesWritten,
});
return { url: stream.url };
},
});Always Close Streams
Streams must be closed manually with stream.close(). They do not auto-close. Failing to close a stream leaves it in an incomplete state.
Background Processing
Use ctx.waitUntil() to write data in the background while returning immediately:
const agent = createAgent('BackgroundExporter', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('report', {
contentType: 'application/json',
});
// Return URL immediately
const response = { streamUrl: stream.url };
// Process in background
ctx.waitUntil(async () => {
const data = await fetchLargeDataset(input.query);
await stream.write(JSON.stringify(data, null, 2));
await stream.close();
ctx.logger.info('Background export complete');
});
return response;
},
});Listing and Deleting Streams
const agent = createAgent('StreamManager', {
handler: async (ctx, input) => {
// List streams with optional filtering
const result = await ctx.stream.list({
name: 'export', // filter by stream name
limit: 10,
});
ctx.logger.info('Found streams', {
total: result.total,
count: result.streams.length,
});
// Delete a stream
await ctx.stream.delete(input.streamId);
return { streams: result.streams };
},
});Dual Stream Pattern
Create two streams simultaneously: one for the client response, one for audit logging.
import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
const agent = createAgent('DualStreamWriter', {
handler: async (ctx, input) => {
// Create two streams
const mainStream = await ctx.stream.create('output', {
contentType: 'text/plain',
});
const auditStream = await ctx.stream.create('audit', {
contentType: 'application/json',
metadata: { userId: input.userId },
});
// Return main stream URL immediately
const response = { streamUrl: mainStream.url };
// Process both streams in background
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
const chunks: string[] = [];
for await (const chunk of textStream) {
// Write to client stream
await mainStream.write(chunk);
chunks.push(chunk);
}
// Write audit log
await auditStream.write(JSON.stringify({
timestamp: new Date().toISOString(),
userId: input.userId,
prompt: input.message,
response: chunks.join(''),
}));
await mainStream.close();
await auditStream.close();
});
return response;
},
});Content Moderation While Streaming
Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:
import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';
const agent = createAgent('ModeratedStreamer', {
handler: async (ctx, input) => {
const stream = await ctx.stream.create('moderated', {
contentType: 'text/plain',
});
ctx.waitUntil(async () => {
const { textStream } = streamText({
model: openai('gpt-5-mini'),
prompt: input.message,
});
let buffer = '';
const sentenceEnd = /[.!?]\s/;
for await (const chunk of textStream) {
buffer += chunk;
// Check complete sentences
if (sentenceEnd.test(buffer)) {
const isAppropriate = await moderateContent(buffer);
if (isAppropriate) {
await stream.write(buffer);
} else {
ctx.logger.warn('Content blocked', { content: buffer });
await stream.write('[Content filtered]');
}
buffer = '';
}
}
// Flush remaining buffer
if (buffer) {
await stream.write(buffer);
}
await stream.close();
});
return { streamUrl: stream.url };
},
});
// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
const { object } = await generateObject({
model: groq('openai/gpt-oss-20b'),
schema: s.object({
safe: s.boolean(),
reason: s.optional(s.string()),
}),
prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
});
return object.safe;
}Stream Properties
| Property | Description |
|---|---|
stream.id | Unique stream identifier |
stream.url | Public URL to access the stream |
stream.bytesWritten | Total bytes written so far |
Best Practices
- Use compression: Enable
compress: truefor large text exports to reduce storage and transfer time - Return URLs early: Use
ctx.waitUntil()to return the stream URL while writing in the background - Clean up: Delete streams after they're no longer needed to free storage
- Set content types: Always specify the correct MIME type for proper browser handling
Next Steps
- Key-Value Storage: Fast caching and configuration
- Vector Storage: Semantic search and embeddings
- Object Storage (S3): File and media storage
- Database: Relational data with queries and transactions
- Streaming Responses: HTTP streaming patterns
Need Help?
Join our Community for assistance or just to hang with other humans building agents.
Send us an email at hi@agentuity.com if you'd like to get in touch.
Please Follow us on
If you haven't already, please Signup for your free account now and start building your first agent!