39 lines
1.1 KiB
C#
39 lines
1.1 KiB
C#
using ControlPlane.Core.Messages;
|
|
using System.Collections.Concurrent;
|
|
using System.Threading.Channels;
|
|
|
|
namespace ControlPlane.Api.Services;
|
|
|
|
/// <summary>
|
|
/// Thin in-process pub/sub for SSE. MassTransit consumer writes here;
|
|
/// the SSE endpoint reads and streams to the browser.
|
|
/// </summary>
|
|
public sealed class SseEventBus
|
|
{
|
|
private readonly ConcurrentDictionary<Guid, List<Channel<ProvisioningProgressEvent>>> _subs = new();
|
|
|
|
public void Publish(ProvisioningProgressEvent evt)
|
|
{
|
|
if (!_subs.TryGetValue(evt.JobId, out var channels)) return;
|
|
lock (channels)
|
|
foreach (var ch in channels)
|
|
ch.Writer.TryWrite(evt);
|
|
}
|
|
|
|
public Channel<ProvisioningProgressEvent> Subscribe(Guid jobId)
|
|
{
|
|
var ch = Channel.CreateUnbounded<ProvisioningProgressEvent>();
|
|
_subs.GetOrAdd(jobId, _ => []).Add(ch);
|
|
return ch;
|
|
}
|
|
|
|
public void Unsubscribe(Guid jobId, Channel<ProvisioningProgressEvent> channel)
|
|
{
|
|
if (_subs.TryGetValue(jobId, out var channels))
|
|
{
|
|
lock (channels) channels.Remove(channel);
|
|
channel.Writer.TryComplete();
|
|
}
|
|
}
|
|
}
|