Files
2026-04-25 18:05:57 -04:00

39 lines
1.1 KiB
C#

using ControlPlane.Core.Messages;
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace ControlPlane.Api.Services;
/// <summary>
/// Thin in-process pub/sub for SSE. MassTransit consumer writes here;
/// the SSE endpoint reads and streams to the browser.
/// </summary>
public sealed class SseEventBus
{
private readonly ConcurrentDictionary<Guid, List<Channel<ProvisioningProgressEvent>>> _subs = new();
public void Publish(ProvisioningProgressEvent evt)
{
if (!_subs.TryGetValue(evt.JobId, out var channels)) return;
lock (channels)
foreach (var ch in channels)
ch.Writer.TryWrite(evt);
}
public Channel<ProvisioningProgressEvent> Subscribe(Guid jobId)
{
var ch = Channel.CreateUnbounded<ProvisioningProgressEvent>();
_subs.GetOrAdd(jobId, _ => []).Add(ch);
return ch;
}
public void Unsubscribe(Guid jobId, Channel<ProvisioningProgressEvent> channel)
{
if (_subs.TryGetValue(jobId, out var channels))
{
lock (channels) channels.Remove(channel);
channel.Writer.TryComplete();
}
}
}