Files
OPC/ControlPlane.Worker/ProvisioningWorker.cs
2026-04-25 18:05:57 -04:00

168 lines
6.8 KiB
C#

using ControlPlane.Core.Config;
using ControlPlane.Core.Interfaces;
using ControlPlane.Core.Messages;
using ControlPlane.Core.Models;
using ControlPlane.Core.Services;
using MassTransit;
using Microsoft.Extensions.Options;
namespace ControlPlane.Worker;
/// <summary>
/// MassTransit consumer. Triggered by ProvisionClientCommand off RabbitMQ.
/// Runs the saga and publishes ProvisioningProgressEvent for each step transition.
/// </summary>
public sealed class ProvisioningConsumer(
IEnumerable<ISagaStep> steps,
IPublishEndpoint bus,
IConfiguration config,
IOptions<ClarityInfraOptions> infraOptions,
TenantRegistryService registry,
ILogger<ProvisioningConsumer> logger) : IConsumer<ProvisionClientCommand>
{
public async Task Consume(ConsumeContext<ProvisionClientCommand> context)
{
var cmd = context.Message;
var job = new ProvisioningJob
{
Id = cmd.JobId,
ClientName = cmd.ClientName,
StateCode = cmd.StateCode,
Subdomain = cmd.Subdomain,
AdminEmail = cmd.AdminEmail,
SiteCode = cmd.SiteCode,
Environment = cmd.Environment,
Status = ProvisioningStatus.Running
};
logger.LogInformation("Starting provisioning saga for job {JobId} ({Client})", job.Id, job.ClientName);
await RunSagaAsync(job, context.CancellationToken);
}
private async Task RunSagaAsync(ProvisioningJob job, CancellationToken cancellationToken)
{
var sagaContext = new SagaContext { Job = job };
var executedSteps = new Stack<ISagaStep>();
foreach (var step in steps)
{
try
{
await Publish(job.Id, "step_started", step.StepName, $"Starting: {step.StepName}");
logger.LogInformation("[{JobId}] Executing: {Step}", job.Id, step.StepName);
await step.ExecuteAsync(sagaContext, cancellationToken);
executedSteps.Push(step);
await Publish(job.Id, "step_complete", step.StepName, $"Completed: {step.StepName}");
}
catch (Exception ex)
{
logger.LogError(ex, "[{JobId}] Step {Step} failed", job.Id, step.StepName);
await Publish(job.Id, "step_failed", step.StepName, $"Failed: {step.StepName} - {ex.Message}");
await PublishDiagnostic(job.Id, step.StepName, ex);
job.Status = ProvisioningStatus.Compensating;
job.FailureReason = $"{step.StepName}: {ex.Message}";
await CompensateAsync(sagaContext, executedSteps, cancellationToken);
job.Status = ProvisioningStatus.Failed;
await Publish(job.Id, "job_failed", null, job.FailureReason);
return;
}
}
job.Status = ProvisioningStatus.Completed;
job.CompletedAt = DateTimeOffset.UtcNow;
await Publish(job.Id, "job_complete", null, "All steps completed successfully.");
var infra = infraOptions.Value;
var apiBaseUrl = sagaContext.TenantApiBaseUrl
?? infra.TenantPublicUrl(job.Subdomain);
// Persist to ClientAssets/{subdomain}.xml
var nginxConfPath = config["Nginx:ConfDPath"] is { } p
? Path.Combine(p, $"{job.Subdomain}.conf")
: null;
var record = new TenantRecord
{
JobId = job.Id.ToString(),
Subdomain = job.Subdomain,
ClientName = job.ClientName,
StateCode = job.StateCode,
AdminEmail = job.AdminEmail,
SiteCode = job.SiteCode,
Environment = job.Environment,
Tier = job.Tier.ToString(),
ApiBaseUrl = apiBaseUrl,
Status = "Provisioned",
ProvisionedAt = job.CompletedAt!.Value.ToString("o"),
ContainerName = sagaContext.ContainerName,
ContainerPort = null,
ContainerImage = config["Docker:ClarityServerImage"] ?? "clarity-server:latest",
ContainerNetwork = infra.Network,
NginxConfPath = nginxConfPath,
PublicUrl = infra.TenantPublicUrl(job.Subdomain),
LastProvisioningStep = "LaunchStep",
ProvisioningNotes = $"Provisioned at {job.CompletedAt:o}. All {job.CompletedSteps} steps completed.",
};
// AppSettings — enriched by each step via SagaContext
record.SetAppSetting("Keycloak:Realm", $"clarity-{job.Subdomain.ToLowerInvariant()}");
record.SetAppSetting("Keycloak:BaseUrl", infra.KeycloakPublicUrl);
record.SetAppSetting("Keycloak:InternalUrl", infra.KeycloakInternalUrl);
if (!string.IsNullOrWhiteSpace(sagaContext.TenantStackName))
record.SetAppSetting("Pulumi:StackName", sagaContext.TenantStackName);
// ConnectionStrings — written by MigrationStep once DB is provisioned
if (!string.IsNullOrWhiteSpace(sagaContext.TenantConnectionString))
record.SetConnectionString("TenantDb", sagaContext.TenantConnectionString);
registry.Save(record);
logger.LogInformation("[{JobId}] Provisioning completed. Tenant record saved.", job.Id);
}
private async Task CompensateAsync(SagaContext sagaContext, Stack<ISagaStep> executedSteps, CancellationToken cancellationToken)
{
while (executedSteps.TryPop(out var step))
{
try
{
logger.LogInformation("[{JobId}] Compensating: {Step}", sagaContext.Job.Id, step.StepName);
await Publish(sagaContext.Job.Id, "compensation_started", step.StepName, $"Rolling back: {step.StepName}");
await step.CompensateAsync(sagaContext, cancellationToken);
await Publish(sagaContext.Job.Id, "compensation_complete", step.StepName, $"Rolled back: {step.StepName}");
}
catch (Exception ex)
{
logger.LogError(ex, "[{JobId}] Compensation failed for {Step} - manual intervention required", sagaContext.Job.Id, step.StepName);
await PublishDiagnostic(sagaContext.Job.Id, $"{step.StepName} (compensation)", ex);
}
}
}
private Task Publish(Guid jobId, string type, string? step, string? message) =>
bus.Publish(new ProvisioningProgressEvent
{
JobId = jobId,
Type = type,
Step = step,
Message = message
});
private Task PublishDiagnostic(Guid jobId, string? step, Exception ex) =>
bus.Publish(new ProvisioningProgressEvent
{
JobId = jobId,
Type = "diagnostic",
Step = step,
Message = ex.Message,
Detail = ex.ToString()
});
}