Skip to content

Real-time Monitoring

VaultSandbox supports real-time email notifications via Server-Sent Events (SSE), enabling instant processing of emails as they arrive. The .NET client exposes this through IAsyncEnumerable<Email> for natural async enumeration.

var inbox = await client.CreateInboxAsync();
Console.WriteLine($"Monitoring: {inbox.EmailAddress}");
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
Console.WriteLine($"New email: {email.Subject}");
Console.WriteLine($" From: {email.From}");
Console.WriteLine($" Received: {email.ReceivedAt}");
}
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
await foreach (var email in inbox.WatchAsync(cts.Token))
{
Console.WriteLine($"Received: {email.Subject}");
await ProcessEmailAsync(email);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Monitoring stopped after timeout");
}
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
if (email.Subject.Contains("urgent", StringComparison.OrdinalIgnoreCase))
{
await HandleUrgentEmailAsync(email);
}
else
{
await HandleNormalEmailAsync(email);
}
}
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
if (email.Subject.Contains("Verification"))
{
await ProcessVerificationEmailAsync(email);
break; // Stop monitoring after finding the email
}
}
// Process only emails from specific sender
await foreach (var email in inbox.WatchAsync(cancellationToken)
.Where(e => e.From == "[email protected]"))
{
await HandleAlertAsync(email);
}
// Take first 5 emails
await foreach (var email in inbox.WatchAsync(cancellationToken).Take(5))
{
Console.WriteLine($"Email {email.Subject}");
}
var inbox1 = await client.CreateInboxAsync();
var inbox2 = await client.CreateInboxAsync();
var inbox3 = await client.CreateInboxAsync();
var monitor = client.MonitorInboxes(inbox1, inbox2, inbox3);
await foreach (var evt in monitor.WatchAsync(cancellationToken))
{
Console.WriteLine($"Email in {evt.InboxAddress}");
Console.WriteLine($" Subject: {evt.Email.Subject}");
Console.WriteLine($" From: {evt.Email.From}");
}
var monitor = client.MonitorInboxes(inbox1, inbox2);
await foreach (var evt in monitor.WatchAsync(cancellationToken))
{
if (evt.Email.From == "[email protected]")
{
await HandleAlertAsync(evt.Email);
}
else if (evt.Email.Subject.Contains("Invoice"))
{
await HandleInvoiceAsync(evt.Inbox, evt.Email);
}
else
{
Console.WriteLine($"Other email: {evt.Email.Subject}");
}
}
async Task<Email> WaitForSpecificEmailAsync(
IInbox inbox,
Func<Email, bool> predicate,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
try
{
await foreach (var email in inbox.WatchAsync(cts.Token))
{
if (predicate(email))
{
return email;
}
}
throw new VaultSandboxTimeoutException("Stream ended without matching email", timeout);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
throw new VaultSandboxTimeoutException("Timeout waiting for email", timeout);
}
}
// Usage
var email = await WaitForSpecificEmailAsync(
inbox,
e => e.Subject.Contains("Password Reset"),
TimeSpan.FromSeconds(10));
async Task<List<Email>> CollectEmailsAsync(
IInbox inbox,
int count,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
var emails = new List<Email>();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
try
{
await foreach (var email in inbox.WatchAsync(cts.Token))
{
emails.Add(email);
Console.WriteLine($"Received {emails.Count}/{count}");
if (emails.Count >= count)
{
return emails;
}
}
throw new VaultSandboxTimeoutException(
$"Stream ended: only received {emails.Count}/{count}", timeout);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
throw new VaultSandboxTimeoutException(
$"Timeout: only received {emails.Count}/{count}", timeout);
}
}
// Usage
var emails = await CollectEmailsAsync(inbox, 3, TimeSpan.FromSeconds(20));
async Task ProcessEmailPipelineAsync(
IInbox inbox,
CancellationToken cancellationToken)
{
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
try
{
Console.WriteLine($"Processing: {email.Subject}");
// Step 1: Validate
var validation = email.AuthResults?.Validate();
if (validation?.Passed != true)
{
Console.WriteLine($"Failed auth: {string.Join(", ", validation?.Failures ?? [])}");
continue;
}
// Step 2: Extract data
var links = email.Links;
var attachments = email.Attachments;
// Step 3: Store/process
await StoreEmailAsync(email);
// Step 4: Notify
await NotifyProcessedAsync(email.Id);
Console.WriteLine($"Processed: {email.Subject}");
}
catch (Exception ex)
{
Console.WriteLine($"Error processing: {ex.Message}");
}
}
}
using System.Threading.Channels;
var channel = Channel.CreateUnbounded<Email>();
// Producer task
var producerTask = Task.Run(async () =>
{
try
{
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
await channel.Writer.WriteAsync(email, cancellationToken);
}
}
finally
{
channel.Writer.Complete();
}
});
// Consumer
await foreach (var email in channel.Reader.ReadAllAsync(cancellationToken))
{
await ProcessAsync(email);
}
await producerTask;
[Fact]
public async Task Real_Time_Email_Processing()
{
var inbox = await _client.CreateInboxAsync();
var received = new List<Email>();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// Start monitoring in background
var monitorTask = Task.Run(async () =>
{
try
{
await foreach (var email in inbox.WatchAsync(cts.Token))
{
received.Add(email);
if (received.Count >= 2) break;
}
}
catch (OperationCanceledException) { }
});
// Send test emails
await SendEmailAsync(inbox.EmailAddress, "Test 1");
await SendEmailAsync(inbox.EmailAddress, "Test 2");
// Wait for monitor to complete
await monitorTask;
Assert.Equal(2, received.Count);
Assert.Equal("Test 1", received[0].Subject);
Assert.Equal("Test 2", received[1].Subject);
await _client.DeleteInboxAsync(inbox.EmailAddress);
}
[Fact]
public async Task Processes_Emails_Asynchronously()
{
var inbox = await _client.CreateInboxAsync();
var processed = new List<string>();
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var monitorTask = Task.Run(async () =>
{
await foreach (var email in inbox.WatchAsync(cts.Token))
{
await ProcessEmailAsync(email);
processed.Add(email.Id);
break;
}
});
await SendEmailAsync(inbox.EmailAddress, "Test");
await monitorTask;
Assert.Single(processed);
await _client.DeleteInboxAsync(inbox.EmailAddress);
}
try
{
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
try
{
await ProcessEmailAsync(email);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing email: {ex.Message}");
// Continue processing other emails
}
}
}
catch (VaultSandboxException ex)
{
Console.WriteLine($"Stream error: {ex.Message}");
}
class EmailProcessor : IAsyncDisposable
{
private readonly CancellationTokenSource _cts = new();
private Task? _processingTask;
public void Start(IInbox inbox)
{
_processingTask = ProcessAsync(inbox, _cts.Token);
}
private async Task ProcessAsync(IInbox inbox, CancellationToken cancellationToken)
{
try
{
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
await HandleEmailAsync(email);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Processing cancelled");
}
}
public async ValueTask DisposeAsync()
{
_cts.Cancel();
if (_processingTask != null)
{
try
{
await _processingTask;
}
catch (OperationCanceledException) { }
}
_cts.Dispose();
}
}
// Usage
await using var processor = new EmailProcessor();
processor.Start(inbox);
// ... do other work ...
// Processor is gracefully stopped on dispose

Use SSE (real-time) when:

  • You need instant notification of new emails
  • Processing emails as they arrive
  • Building real-time dashboards
  • Minimizing latency is critical
var client = VaultSandboxClientBuilder.Create()
.WithBaseUrl(url)
.WithApiKey(apiKey)
.WithDeliveryStrategy(DeliveryStrategy.Sse)
.Build();

Use polling when:

  • SSE is blocked by firewall/proxy
  • Running in environments that don’t support persistent connections
  • Batch processing is acceptable
var client = VaultSandboxClientBuilder.Create()
.WithBaseUrl(url)
.WithApiKey(apiKey)
.WithDeliveryStrategy(DeliveryStrategy.Polling)
.WithPollInterval(TimeSpan.FromSeconds(2))
.Build();
var client = VaultSandboxClientBuilder.Create()
.WithBaseUrl(url)
.WithApiKey(apiKey)
.WithDeliveryStrategy(DeliveryStrategy.Auto) // Tries SSE, falls back to polling
.Build();
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
await ProcessEmailAsync(email);
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); // Rate limit
}
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
var priority = GetPriority(email);
switch (priority)
{
case Priority.High:
await ProcessImmediatelyAsync(email);
break;
case Priority.Medium:
await QueueForProcessingAsync(email);
break;
default:
await LogAndDiscardAsync(email);
break;
}
}
Priority GetPriority(Email email)
{
if (email.Subject.Contains("URGENT")) return Priority.High;
if (email.From == "[email protected]") return Priority.High;
if (email.Attachments?.Count > 0) return Priority.Medium;
return Priority.Low;
}
using var semaphore = new SemaphoreSlim(3); // Max 3 concurrent
await Parallel.ForEachAsync(
inbox.WatchAsync(cancellationToken),
new ParallelOptions { MaxDegreeOfParallelism = 3 },
async (email, ct) =>
{
await semaphore.WaitAsync(ct);
try
{
await ProcessEmailAsync(email);
}
finally
{
semaphore.Release();
}
});
public class EmailMonitoringTests : IAsyncLifetime
{
private IVaultSandboxClient _client = null!;
private IInbox _inbox = null!;
private CancellationTokenSource _cts = null!;
public async Task InitializeAsync()
{
_client = VaultSandboxClientBuilder.Create()
.WithBaseUrl(Environment.GetEnvironmentVariable("VAULTSANDBOX_URL")!)
.WithApiKey(Environment.GetEnvironmentVariable("VAULTSANDBOX_API_KEY")!)
.Build();
_inbox = await _client.CreateInboxAsync();
_cts = new CancellationTokenSource();
}
public async Task DisposeAsync()
{
_cts.Cancel();
_cts.Dispose();
if (_inbox != null)
{
await _client.DeleteInboxAsync(_inbox.EmailAddress);
}
}
[Fact]
public async Task Monitors_Emails()
{
_cts.CancelAfter(TimeSpan.FromSeconds(5));
await foreach (var email in _inbox.WatchAsync(_cts.Token))
{
Console.WriteLine($"Email: {email.Subject}");
}
}
}
// Good: Allows graceful cancellation
await foreach (var email in inbox.WatchAsync(cancellationToken))
{
await ProcessAsync(email);
}
// Bad: No way to stop monitoring
await foreach (var email in inbox.WatchAsync(default))
{
await ProcessAsync(email);
}
// In library code, use ConfigureAwait(false)
await foreach (var email in inbox.WatchAsync(cancellationToken)
.ConfigureAwait(false))
{
await ProcessAsync(email).ConfigureAwait(false);
}

The SDK automatically handles reconnection. Configure behavior in client options:

var client = VaultSandboxClientBuilder.Create()
.WithBaseUrl(url)
.WithApiKey(apiKey)
.WithSseReconnectInterval(TimeSpan.FromSeconds(5))
.WithSseMaxReconnectAttempts(10)
.Build();