Real-time Monitoring
VaultSandbox supports real-time email notifications via Server-Sent Events (SSE), enabling instant processing of emails as they arrive. The .NET client exposes this through IAsyncEnumerable<Email> for natural async enumeration.
Basic Streaming
Section titled “Basic Streaming”Subscribe to Single Inbox
Section titled “Subscribe to Single Inbox”var inbox = await client.CreateInboxAsync();
Console.WriteLine($"Monitoring: {inbox.EmailAddress}");
await foreach (var email in inbox.WatchAsync(cancellationToken)){ Console.WriteLine($"New email: {email.Subject}"); Console.WriteLine($" From: {email.From}"); Console.WriteLine($" Received: {email.ReceivedAt}");}With Timeout
Section titled “With Timeout”using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try{ await foreach (var email in inbox.WatchAsync(cts.Token)) { Console.WriteLine($"Received: {email.Subject}"); await ProcessEmailAsync(email); }}catch (OperationCanceledException){ Console.WriteLine("Monitoring stopped after timeout");}Processing Patterns
Section titled “Processing Patterns”Conditional Processing
Section titled “Conditional Processing”await foreach (var email in inbox.WatchAsync(cancellationToken)){ if (email.Subject.Contains("urgent", StringComparison.OrdinalIgnoreCase)) { await HandleUrgentEmailAsync(email); } else { await HandleNormalEmailAsync(email); }}Break on Condition
Section titled “Break on Condition”await foreach (var email in inbox.WatchAsync(cancellationToken)){ if (email.Subject.Contains("Verification")) { await ProcessVerificationEmailAsync(email); break; // Stop monitoring after finding the email }}Filter with LINQ
Section titled “Filter with LINQ”// Process only emails from specific senderawait foreach (var email in inbox.WatchAsync(cancellationToken){ await HandleAlertAsync(email);}
// Take first 5 emailsawait foreach (var email in inbox.WatchAsync(cancellationToken).Take(5)){ Console.WriteLine($"Email {email.Subject}");}Monitoring Multiple Inboxes
Section titled “Monitoring Multiple Inboxes”Using InboxMonitor
Section titled “Using InboxMonitor”var inbox1 = await client.CreateInboxAsync();var inbox2 = await client.CreateInboxAsync();var inbox3 = await client.CreateInboxAsync();
var monitor = client.MonitorInboxes(inbox1, inbox2, inbox3);
await foreach (var evt in monitor.WatchAsync(cancellationToken)){ Console.WriteLine($"Email in {evt.InboxAddress}"); Console.WriteLine($" Subject: {evt.Email.Subject}"); Console.WriteLine($" From: {evt.Email.From}");}Monitoring with Handlers
Section titled “Monitoring with Handlers”var monitor = client.MonitorInboxes(inbox1, inbox2);
await foreach (var evt in monitor.WatchAsync(cancellationToken)){ { await HandleAlertAsync(evt.Email); } else if (evt.Email.Subject.Contains("Invoice")) { await HandleInvoiceAsync(evt.Inbox, evt.Email); } else { Console.WriteLine($"Other email: {evt.Email.Subject}"); }}Real-World Patterns
Section titled “Real-World Patterns”Wait for Specific Email
Section titled “Wait for Specific Email”async Task<Email> WaitForSpecificEmailAsync( IInbox inbox, Func<Email, bool> predicate, TimeSpan timeout, CancellationToken cancellationToken = default){ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(timeout);
try { await foreach (var email in inbox.WatchAsync(cts.Token)) { if (predicate(email)) { return email; } }
throw new VaultSandboxTimeoutException("Stream ended without matching email", timeout); } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { throw new VaultSandboxTimeoutException("Timeout waiting for email", timeout); }}
// Usagevar email = await WaitForSpecificEmailAsync( inbox, e => e.Subject.Contains("Password Reset"), TimeSpan.FromSeconds(10));Collect Multiple Emails
Section titled “Collect Multiple Emails”async Task<List<Email>> CollectEmailsAsync( IInbox inbox, int count, TimeSpan timeout, CancellationToken cancellationToken = default){ var emails = new List<Email>(); using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(timeout);
try { await foreach (var email in inbox.WatchAsync(cts.Token)) { emails.Add(email); Console.WriteLine($"Received {emails.Count}/{count}");
if (emails.Count >= count) { return emails; } }
throw new VaultSandboxTimeoutException( $"Stream ended: only received {emails.Count}/{count}", timeout); } catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested) { throw new VaultSandboxTimeoutException( $"Timeout: only received {emails.Count}/{count}", timeout); }}
// Usagevar emails = await CollectEmailsAsync(inbox, 3, TimeSpan.FromSeconds(20));Process Email Pipeline
Section titled “Process Email Pipeline”async Task ProcessEmailPipelineAsync( IInbox inbox, CancellationToken cancellationToken){ await foreach (var email in inbox.WatchAsync(cancellationToken)) { try { Console.WriteLine($"Processing: {email.Subject}");
// Step 1: Validate var validation = email.AuthResults?.Validate(); if (validation?.Passed != true) { Console.WriteLine($"Failed auth: {string.Join(", ", validation?.Failures ?? [])}"); continue; }
// Step 2: Extract data var links = email.Links; var attachments = email.Attachments;
// Step 3: Store/process await StoreEmailAsync(email);
// Step 4: Notify await NotifyProcessedAsync(email.Id);
Console.WriteLine($"Processed: {email.Subject}"); } catch (Exception ex) { Console.WriteLine($"Error processing: {ex.Message}"); } }}Integration with Channels
Section titled “Integration with Channels”using System.Threading.Channels;
var channel = Channel.CreateUnbounded<Email>();
// Producer taskvar producerTask = Task.Run(async () =>{ try { await foreach (var email in inbox.WatchAsync(cancellationToken)) { await channel.Writer.WriteAsync(email, cancellationToken); } } finally { channel.Writer.Complete(); }});
// Consumerawait foreach (var email in channel.Reader.ReadAllAsync(cancellationToken)){ await ProcessAsync(email);}
await producerTask;Testing with Real-Time Monitoring
Section titled “Testing with Real-Time Monitoring”Integration Test
Section titled “Integration Test”[Fact]public async Task Real_Time_Email_Processing(){ var inbox = await _client.CreateInboxAsync(); var received = new List<Email>();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Start monitoring in background var monitorTask = Task.Run(async () => { try { await foreach (var email in inbox.WatchAsync(cts.Token)) { received.Add(email); if (received.Count >= 2) break; } } catch (OperationCanceledException) { } });
// Send test emails await SendEmailAsync(inbox.EmailAddress, "Test 1"); await SendEmailAsync(inbox.EmailAddress, "Test 2");
// Wait for monitor to complete await monitorTask;
Assert.Equal(2, received.Count); Assert.Equal("Test 1", received[0].Subject); Assert.Equal("Test 2", received[1].Subject);
await _client.DeleteInboxAsync(inbox.EmailAddress);}Async Processing Test
Section titled “Async Processing Test”[Fact]public async Task Processes_Emails_Asynchronously(){ var inbox = await _client.CreateInboxAsync(); var processed = new List<string>();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var monitorTask = Task.Run(async () => { await foreach (var email in inbox.WatchAsync(cts.Token)) { await ProcessEmailAsync(email); processed.Add(email.Id); break; } });
await SendEmailAsync(inbox.EmailAddress, "Test");
await monitorTask;
Assert.Single(processed);
await _client.DeleteInboxAsync(inbox.EmailAddress);}Error Handling
Section titled “Error Handling”Handle Stream Errors
Section titled “Handle Stream Errors”try{ await foreach (var email in inbox.WatchAsync(cancellationToken)) { try { await ProcessEmailAsync(email); } catch (Exception ex) { Console.WriteLine($"Error processing email: {ex.Message}"); // Continue processing other emails } }}catch (VaultSandboxException ex){ Console.WriteLine($"Stream error: {ex.Message}");}Graceful Shutdown
Section titled “Graceful Shutdown”class EmailProcessor : IAsyncDisposable{ private readonly CancellationTokenSource _cts = new(); private Task? _processingTask;
public void Start(IInbox inbox) { _processingTask = ProcessAsync(inbox, _cts.Token); }
private async Task ProcessAsync(IInbox inbox, CancellationToken cancellationToken) { try { await foreach (var email in inbox.WatchAsync(cancellationToken)) { await HandleEmailAsync(email); } } catch (OperationCanceledException) { Console.WriteLine("Processing cancelled"); } }
public async ValueTask DisposeAsync() { _cts.Cancel(); if (_processingTask != null) { try { await _processingTask; } catch (OperationCanceledException) { } } _cts.Dispose(); }}
// Usageawait using var processor = new EmailProcessor();processor.Start(inbox);
// ... do other work ...
// Processor is gracefully stopped on disposeSSE vs Polling
Section titled “SSE vs Polling”When to Use SSE
Section titled “When to Use SSE”Use SSE (real-time) when:
- You need instant notification of new emails
- Processing emails as they arrive
- Building real-time dashboards
- Minimizing latency is critical
var client = VaultSandboxClientBuilder.Create() .WithBaseUrl(url) .WithApiKey(apiKey) .WithDeliveryStrategy(DeliveryStrategy.Sse) .Build();When to Use Polling
Section titled “When to Use Polling”Use polling when:
- SSE is blocked by firewall/proxy
- Running in environments that don’t support persistent connections
- Batch processing is acceptable
var client = VaultSandboxClientBuilder.Create() .WithBaseUrl(url) .WithApiKey(apiKey) .WithDeliveryStrategy(DeliveryStrategy.Polling) .WithPollInterval(TimeSpan.FromSeconds(2)) .Build();Auto Strategy (Recommended)
Section titled “Auto Strategy (Recommended)”var client = VaultSandboxClientBuilder.Create() .WithBaseUrl(url) .WithApiKey(apiKey) .WithDeliveryStrategy(DeliveryStrategy.Auto) // Tries SSE, falls back to polling .Build();Advanced Patterns
Section titled “Advanced Patterns”Rate-Limited Processing
Section titled “Rate-Limited Processing”await foreach (var email in inbox.WatchAsync(cancellationToken)){ await ProcessEmailAsync(email); await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); // Rate limit}Priority Processing
Section titled “Priority Processing”await foreach (var email in inbox.WatchAsync(cancellationToken)){ var priority = GetPriority(email);
switch (priority) { case Priority.High: await ProcessImmediatelyAsync(email); break; case Priority.Medium: await QueueForProcessingAsync(email); break; default: await LogAndDiscardAsync(email); break; }}
Priority GetPriority(Email email){ if (email.Subject.Contains("URGENT")) return Priority.High; if (email.Attachments?.Count > 0) return Priority.Medium; return Priority.Low;}Parallel Processing with Semaphore
Section titled “Parallel Processing with Semaphore”using var semaphore = new SemaphoreSlim(3); // Max 3 concurrent
await Parallel.ForEachAsync( inbox.WatchAsync(cancellationToken), new ParallelOptions { MaxDegreeOfParallelism = 3 }, async (email, ct) => { await semaphore.WaitAsync(ct); try { await ProcessEmailAsync(email); } finally { semaphore.Release(); } });Cleanup
Section titled “Cleanup”Proper Cleanup in Tests
Section titled “Proper Cleanup in Tests”public class EmailMonitoringTests : IAsyncLifetime{ private IVaultSandboxClient _client = null!; private IInbox _inbox = null!; private CancellationTokenSource _cts = null!;
public async Task InitializeAsync() { _client = VaultSandboxClientBuilder.Create() .WithBaseUrl(Environment.GetEnvironmentVariable("VAULTSANDBOX_URL")!) .WithApiKey(Environment.GetEnvironmentVariable("VAULTSANDBOX_API_KEY")!) .Build();
_inbox = await _client.CreateInboxAsync(); _cts = new CancellationTokenSource(); }
public async Task DisposeAsync() { _cts.Cancel(); _cts.Dispose();
if (_inbox != null) { await _client.DeleteInboxAsync(_inbox.EmailAddress); } }
[Fact] public async Task Monitors_Emails() { _cts.CancelAfter(TimeSpan.FromSeconds(5));
await foreach (var email in _inbox.WatchAsync(_cts.Token)) { Console.WriteLine($"Email: {email.Subject}"); } }}Best Practices
Section titled “Best Practices”Always Use CancellationToken
Section titled “Always Use CancellationToken”// Good: Allows graceful cancellationawait foreach (var email in inbox.WatchAsync(cancellationToken)){ await ProcessAsync(email);}
// Bad: No way to stop monitoringawait foreach (var email in inbox.WatchAsync(default)){ await ProcessAsync(email);}Use ConfigureAwait When Appropriate
Section titled “Use ConfigureAwait When Appropriate”// In library code, use ConfigureAwait(false)await foreach (var email in inbox.WatchAsync(cancellationToken) .ConfigureAwait(false)){ await ProcessAsync(email).ConfigureAwait(false);}Handle Reconnection
Section titled “Handle Reconnection”The SDK automatically handles reconnection. Configure behavior in client options:
var client = VaultSandboxClientBuilder.Create() .WithBaseUrl(url) .WithApiKey(apiKey) .WithSseReconnectInterval(TimeSpan.FromSeconds(5)) .WithSseMaxReconnectAttempts(10) .Build();Next Steps
Section titled “Next Steps”- Waiting for Emails - Alternative polling-based approach
- Managing Inboxes - Inbox operations
- Delivery Strategies - SSE vs Polling deep dive
- Configuration - Configure SSE behavior