Build/Storage

Durable Streams

Streaming storage for large exports, audit logs, and real-time data

Durable streams provide streaming storage for large data exports, audit logs, and real-time processing. Streams follow a write-once, read-many pattern: once data is written and the stream is closed, the content is immutable and can be accessed via a permanent public URL.

When to Use Durable Streams

Storage TypeBest For
Durable StreamsLarge exports, audit logs, streaming data, batch processing
Key-ValueFast lookups, caching, configuration
VectorSemantic search, embeddings, RAG
Object (S3)Files, images, documents, media
DatabaseStructured data, complex queries, transactions

Use streams when you need to:

  • Export large datasets incrementally
  • Create audit logs while streaming to clients
  • Process data in chunks without holding everything in memory
  • Return a URL immediately while data writes in the background

Creating Streams

import { createAgent } from '@agentuity/runtime';
 
const agent = createAgent('StreamCreator', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
      compress: true,  // optional gzip compression
      metadata: { userId: input.userId },
    });
 
    // Stream is ready immediately
    ctx.logger.info('Stream created', {
      id: stream.id,
      url: stream.url,
    });
 
    return { streamId: stream.id, streamUrl: stream.url };
  },
});

Options:

  • contentType: MIME type for the stream content (e.g., text/csv, application/json)
  • compress: Enable gzip compression for smaller storage and faster transfers
  • metadata: Key-value pairs for tracking stream context (user IDs, timestamps, etc.)

Writing Data

Write data incrementally, then close the stream:

const agent = createAgent('CSVExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('export', {
      contentType: 'text/csv',
    });
 
    // Write header
    await stream.write('Name,Email,Created\n');
 
    // Write rows
    for (const user of input.users) {
      await stream.write(`${user.name},${user.email},${user.created}\n`);
    }
 
    // Close when done
    await stream.close();
 
    ctx.logger.info('Export complete', {
      bytesWritten: stream.bytesWritten,
    });
 
    return { url: stream.url };
  },
});

Always Close Streams

Streams must be closed manually with stream.close(). They do not auto-close. Failing to close a stream leaves it in an incomplete state.

Background Processing

Use ctx.waitUntil() to write data in the background while returning immediately:

const agent = createAgent('BackgroundExporter', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('report', {
      contentType: 'application/json',
    });
 
    // Return URL immediately
    const response = { streamUrl: stream.url };
 
    // Process in background
    ctx.waitUntil(async () => {
      const data = await fetchLargeDataset(input.query);
 
      await stream.write(JSON.stringify(data, null, 2));
      await stream.close();
 
      ctx.logger.info('Background export complete');
    });
 
    return response;
  },
});

Listing and Deleting Streams

const agent = createAgent('StreamManager', {
  handler: async (ctx, input) => {
    // List streams with optional filtering
    const result = await ctx.stream.list({
      name: 'export',  // filter by stream name
      limit: 10,
    });
 
    ctx.logger.info('Found streams', {
      total: result.total,
      count: result.streams.length,
    });
 
    // Delete a stream
    await ctx.stream.delete(input.streamId);
 
    return { streams: result.streams };
  },
});

Dual Stream Pattern

Create two streams simultaneously: one for the client response, one for audit logging.

import { createAgent } from '@agentuity/runtime';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
 
const agent = createAgent('DualStreamWriter', {
  handler: async (ctx, input) => {
    // Create two streams
    const mainStream = await ctx.stream.create('output', {
      contentType: 'text/plain',
    });
    const auditStream = await ctx.stream.create('audit', {
      contentType: 'application/json',
      metadata: { userId: input.userId },
    });
 
    // Return main stream URL immediately
    const response = { streamUrl: mainStream.url };
 
    // Process both streams in background
    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5-mini'),
        prompt: input.message,
      });
 
      const chunks: string[] = [];
      for await (const chunk of textStream) {
        // Write to client stream
        await mainStream.write(chunk);
        chunks.push(chunk);
      }
 
      // Write audit log
      await auditStream.write(JSON.stringify({
        timestamp: new Date().toISOString(),
        userId: input.userId,
        prompt: input.message,
        response: chunks.join(''),
      }));
 
      await mainStream.close();
      await auditStream.close();
    });
 
    return response;
  },
});

Content Moderation While Streaming

Buffer and evaluate content in real-time using an LLM-as-a-judge pattern:

import { generateObject, streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { groq } from '@ai-sdk/groq';
import { s } from '@agentuity/schema';
 
const agent = createAgent('ModeratedStreamer', {
  handler: async (ctx, input) => {
    const stream = await ctx.stream.create('moderated', {
      contentType: 'text/plain',
    });
 
    ctx.waitUntil(async () => {
      const { textStream } = streamText({
        model: openai('gpt-5-mini'),
        prompt: input.message,
      });
 
      let buffer = '';
      const sentenceEnd = /[.!?]\s/;
 
      for await (const chunk of textStream) {
        buffer += chunk;
 
        // Check complete sentences
        if (sentenceEnd.test(buffer)) {
          const isAppropriate = await moderateContent(buffer);
 
          if (isAppropriate) {
            await stream.write(buffer);
          } else {
            ctx.logger.warn('Content blocked', { content: buffer });
            await stream.write('[Content filtered]');
          }
          buffer = '';
        }
      }
 
      // Flush remaining buffer
      if (buffer) {
        await stream.write(buffer);
      }
 
      await stream.close();
    });
 
    return { streamUrl: stream.url };
  },
});
 
// Use Groq for low-latency moderation
async function moderateContent(text: string): Promise<boolean> {
  const { object } = await generateObject({
    model: groq('openai/gpt-oss-20b'),
    schema: s.object({
      safe: s.boolean(),
      reason: s.optional(s.string()),
    }),
    prompt: `Is this content appropriate? Respond with safe=true if appropriate, safe=false if it contains harmful content.\n\nContent: "${text}"`,
  });
 
  return object.safe;
}

Stream Properties

PropertyDescription
stream.idUnique stream identifier
stream.urlPublic URL to access the stream
stream.bytesWrittenTotal bytes written so far

Best Practices

  • Use compression: Enable compress: true for large text exports to reduce storage and transfer time
  • Return URLs early: Use ctx.waitUntil() to return the stream URL while writing in the background
  • Clean up: Delete streams after they're no longer needed to free storage
  • Set content types: Always specify the correct MIME type for proper browser handling

Next Steps

Need Help?

Join our DiscordCommunity for assistance or just to hang with other humans building agents.

Send us an email at hi@agentuity.com if you'd like to get in touch.

Please Follow us on

If you haven't already, please Signup for your free account now and start building your first agent!