using ControlPlane.Core.Messages;
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace ControlPlane.Api.Services;
///
/// Thin in-process pub/sub for SSE. MassTransit consumer writes here;
/// the SSE endpoint reads and streams to the browser.
///
public sealed class SseEventBus
{
private readonly ConcurrentDictionary>> _subs = new();
public void Publish(ProvisioningProgressEvent evt)
{
if (!_subs.TryGetValue(evt.JobId, out var channels)) return;
lock (channels)
foreach (var ch in channels)
ch.Writer.TryWrite(evt);
}
public Channel Subscribe(Guid jobId)
{
var ch = Channel.CreateUnbounded();
_subs.GetOrAdd(jobId, _ => []).Add(ch);
return ch;
}
public void Unsubscribe(Guid jobId, Channel channel)
{
if (_subs.TryGetValue(jobId, out var channels))
{
lock (channels) channels.Remove(channel);
channel.Writer.TryComplete();
}
}
}