using ControlPlane.Core.Messages; using System.Collections.Concurrent; using System.Threading.Channels; namespace ControlPlane.Api.Services; /// /// Thin in-process pub/sub for SSE. MassTransit consumer writes here; /// the SSE endpoint reads and streams to the browser. /// public sealed class SseEventBus { private readonly ConcurrentDictionary>> _subs = new(); public void Publish(ProvisioningProgressEvent evt) { if (!_subs.TryGetValue(evt.JobId, out var channels)) return; lock (channels) foreach (var ch in channels) ch.Writer.TryWrite(evt); } public Channel Subscribe(Guid jobId) { var ch = Channel.CreateUnbounded(); _subs.GetOrAdd(jobId, _ => []).Add(ch); return ch; } public void Unsubscribe(Guid jobId, Channel channel) { if (_subs.TryGetValue(jobId, out var channels)) { lock (channels) channels.Remove(channel); channel.Writer.TryComplete(); } } }