using ControlPlane.Core.Config; using ControlPlane.Core.Interfaces; using ControlPlane.Core.Messages; using ControlPlane.Core.Models; using ControlPlane.Core.Services; using MassTransit; using Microsoft.Extensions.Options; namespace ControlPlane.Worker; /// /// MassTransit consumer. Triggered by ProvisionClientCommand off RabbitMQ. /// Runs the saga and publishes ProvisioningProgressEvent for each step transition. /// public sealed class ProvisioningConsumer( IEnumerable steps, IPublishEndpoint bus, IConfiguration config, IOptions infraOptions, TenantRegistryService registry, ILogger logger) : IConsumer { public async Task Consume(ConsumeContext context) { var cmd = context.Message; var job = new ProvisioningJob { Id = cmd.JobId, ClientName = cmd.ClientName, StateCode = cmd.StateCode, Subdomain = cmd.Subdomain, AdminEmail = cmd.AdminEmail, SiteCode = cmd.SiteCode, Environment = cmd.Environment, Status = ProvisioningStatus.Running }; logger.LogInformation("Starting provisioning saga for job {JobId} ({Client})", job.Id, job.ClientName); await RunSagaAsync(job, context.CancellationToken); } private async Task RunSagaAsync(ProvisioningJob job, CancellationToken cancellationToken) { var sagaContext = new SagaContext { Job = job }; var executedSteps = new Stack(); foreach (var step in steps) { try { await Publish(job.Id, "step_started", step.StepName, $"Starting: {step.StepName}"); logger.LogInformation("[{JobId}] Executing: {Step}", job.Id, step.StepName); await step.ExecuteAsync(sagaContext, cancellationToken); executedSteps.Push(step); await Publish(job.Id, "step_complete", step.StepName, $"Completed: {step.StepName}"); } catch (Exception ex) { logger.LogError(ex, "[{JobId}] Step {Step} failed", job.Id, step.StepName); await Publish(job.Id, "step_failed", step.StepName, $"Failed: {step.StepName} - {ex.Message}"); await PublishDiagnostic(job.Id, step.StepName, ex); job.Status = ProvisioningStatus.Compensating; job.FailureReason = $"{step.StepName}: {ex.Message}"; await CompensateAsync(sagaContext, executedSteps, cancellationToken); job.Status = ProvisioningStatus.Failed; await Publish(job.Id, "job_failed", null, job.FailureReason); return; } } job.Status = ProvisioningStatus.Completed; job.CompletedAt = DateTimeOffset.UtcNow; await Publish(job.Id, "job_complete", null, "All steps completed successfully."); var infra = infraOptions.Value; var apiBaseUrl = sagaContext.TenantApiBaseUrl ?? infra.TenantPublicUrl(job.Subdomain); // Persist to ClientAssets/{subdomain}.xml var nginxConfPath = config["Nginx:ConfDPath"] is { } p ? Path.Combine(p, $"{job.Subdomain}.conf") : null; var record = new TenantRecord { JobId = job.Id.ToString(), Subdomain = job.Subdomain, ClientName = job.ClientName, StateCode = job.StateCode, AdminEmail = job.AdminEmail, SiteCode = job.SiteCode, Environment = job.Environment, Tier = job.Tier.ToString(), ApiBaseUrl = apiBaseUrl, Status = "Provisioned", ProvisionedAt = job.CompletedAt!.Value.ToString("o"), ContainerName = sagaContext.ContainerName, ContainerPort = null, ContainerImage = config["Docker:ClarityServerImage"] ?? "clarity-server:latest", ContainerNetwork = infra.Network, NginxConfPath = nginxConfPath, PublicUrl = infra.TenantPublicUrl(job.Subdomain), LastProvisioningStep = "LaunchStep", ProvisioningNotes = $"Provisioned at {job.CompletedAt:o}. All {job.CompletedSteps} steps completed.", }; // AppSettings — enriched by each step via SagaContext record.SetAppSetting("Keycloak:Realm", $"clarity-{job.Subdomain.ToLowerInvariant()}"); record.SetAppSetting("Keycloak:BaseUrl", infra.KeycloakPublicUrl); record.SetAppSetting("Keycloak:InternalUrl", infra.KeycloakInternalUrl); if (!string.IsNullOrWhiteSpace(sagaContext.TenantStackName)) record.SetAppSetting("Pulumi:StackName", sagaContext.TenantStackName); // ConnectionStrings — written by MigrationStep once DB is provisioned if (!string.IsNullOrWhiteSpace(sagaContext.TenantConnectionString)) record.SetConnectionString("TenantDb", sagaContext.TenantConnectionString); registry.Save(record); logger.LogInformation("[{JobId}] Provisioning completed. Tenant record saved.", job.Id); } private async Task CompensateAsync(SagaContext sagaContext, Stack executedSteps, CancellationToken cancellationToken) { while (executedSteps.TryPop(out var step)) { try { logger.LogInformation("[{JobId}] Compensating: {Step}", sagaContext.Job.Id, step.StepName); await Publish(sagaContext.Job.Id, "compensation_started", step.StepName, $"Rolling back: {step.StepName}"); await step.CompensateAsync(sagaContext, cancellationToken); await Publish(sagaContext.Job.Id, "compensation_complete", step.StepName, $"Rolled back: {step.StepName}"); } catch (Exception ex) { logger.LogError(ex, "[{JobId}] Compensation failed for {Step} - manual intervention required", sagaContext.Job.Id, step.StepName); await PublishDiagnostic(sagaContext.Job.Id, $"{step.StepName} (compensation)", ex); } } } private Task Publish(Guid jobId, string type, string? step, string? message) => bus.Publish(new ProvisioningProgressEvent { JobId = jobId, Type = type, Step = step, Message = message }); private Task PublishDiagnostic(Guid jobId, string? step, Exception ex) => bus.Publish(new ProvisioningProgressEvent { JobId = jobId, Type = "diagnostic", Step = step, Message = ex.Message, Detail = ex.ToString() }); }