OPC # 0001: Extract OPC into standalone repo
This commit is contained in:
@@ -0,0 +1,167 @@
|
||||
using ControlPlane.Core.Config;
|
||||
using ControlPlane.Core.Interfaces;
|
||||
using ControlPlane.Core.Messages;
|
||||
using ControlPlane.Core.Models;
|
||||
using ControlPlane.Core.Services;
|
||||
using MassTransit;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace ControlPlane.Worker;
|
||||
|
||||
/// <summary>
|
||||
/// MassTransit consumer. Triggered by ProvisionClientCommand off RabbitMQ.
|
||||
/// Runs the saga and publishes ProvisioningProgressEvent for each step transition.
|
||||
/// </summary>
|
||||
public sealed class ProvisioningConsumer(
|
||||
IEnumerable<ISagaStep> steps,
|
||||
IPublishEndpoint bus,
|
||||
IConfiguration config,
|
||||
IOptions<ClarityInfraOptions> infraOptions,
|
||||
TenantRegistryService registry,
|
||||
ILogger<ProvisioningConsumer> logger) : IConsumer<ProvisionClientCommand>
|
||||
{
|
||||
public async Task Consume(ConsumeContext<ProvisionClientCommand> context)
|
||||
{
|
||||
var cmd = context.Message;
|
||||
var job = new ProvisioningJob
|
||||
{
|
||||
Id = cmd.JobId,
|
||||
ClientName = cmd.ClientName,
|
||||
StateCode = cmd.StateCode,
|
||||
Subdomain = cmd.Subdomain,
|
||||
AdminEmail = cmd.AdminEmail,
|
||||
SiteCode = cmd.SiteCode,
|
||||
Environment = cmd.Environment,
|
||||
Status = ProvisioningStatus.Running
|
||||
};
|
||||
|
||||
logger.LogInformation("Starting provisioning saga for job {JobId} ({Client})", job.Id, job.ClientName);
|
||||
await RunSagaAsync(job, context.CancellationToken);
|
||||
}
|
||||
|
||||
private async Task RunSagaAsync(ProvisioningJob job, CancellationToken cancellationToken)
|
||||
{
|
||||
var sagaContext = new SagaContext { Job = job };
|
||||
var executedSteps = new Stack<ISagaStep>();
|
||||
|
||||
foreach (var step in steps)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Publish(job.Id, "step_started", step.StepName, $"Starting: {step.StepName}");
|
||||
logger.LogInformation("[{JobId}] Executing: {Step}", job.Id, step.StepName);
|
||||
|
||||
await step.ExecuteAsync(sagaContext, cancellationToken);
|
||||
executedSteps.Push(step);
|
||||
|
||||
await Publish(job.Id, "step_complete", step.StepName, $"Completed: {step.StepName}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "[{JobId}] Step {Step} failed", job.Id, step.StepName);
|
||||
await Publish(job.Id, "step_failed", step.StepName, $"Failed: {step.StepName} - {ex.Message}");
|
||||
await PublishDiagnostic(job.Id, step.StepName, ex);
|
||||
|
||||
job.Status = ProvisioningStatus.Compensating;
|
||||
job.FailureReason = $"{step.StepName}: {ex.Message}";
|
||||
|
||||
await CompensateAsync(sagaContext, executedSteps, cancellationToken);
|
||||
|
||||
job.Status = ProvisioningStatus.Failed;
|
||||
await Publish(job.Id, "job_failed", null, job.FailureReason);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
job.Status = ProvisioningStatus.Completed;
|
||||
job.CompletedAt = DateTimeOffset.UtcNow;
|
||||
await Publish(job.Id, "job_complete", null, "All steps completed successfully.");
|
||||
|
||||
var infra = infraOptions.Value;
|
||||
var apiBaseUrl = sagaContext.TenantApiBaseUrl
|
||||
?? infra.TenantPublicUrl(job.Subdomain);
|
||||
|
||||
// Persist to ClientAssets/{subdomain}.xml
|
||||
var nginxConfPath = config["Nginx:ConfDPath"] is { } p
|
||||
? Path.Combine(p, $"{job.Subdomain}.conf")
|
||||
: null;
|
||||
|
||||
var record = new TenantRecord
|
||||
{
|
||||
JobId = job.Id.ToString(),
|
||||
Subdomain = job.Subdomain,
|
||||
ClientName = job.ClientName,
|
||||
StateCode = job.StateCode,
|
||||
AdminEmail = job.AdminEmail,
|
||||
SiteCode = job.SiteCode,
|
||||
Environment = job.Environment,
|
||||
Tier = job.Tier.ToString(),
|
||||
ApiBaseUrl = apiBaseUrl,
|
||||
Status = "Provisioned",
|
||||
ProvisionedAt = job.CompletedAt!.Value.ToString("o"),
|
||||
ContainerName = sagaContext.ContainerName,
|
||||
ContainerPort = null,
|
||||
ContainerImage = config["Docker:ClarityServerImage"] ?? "clarity-server:latest",
|
||||
ContainerNetwork = infra.Network,
|
||||
NginxConfPath = nginxConfPath,
|
||||
PublicUrl = infra.TenantPublicUrl(job.Subdomain),
|
||||
LastProvisioningStep = "LaunchStep",
|
||||
ProvisioningNotes = $"Provisioned at {job.CompletedAt:o}. All {job.CompletedSteps} steps completed.",
|
||||
};
|
||||
|
||||
// AppSettings — enriched by each step via SagaContext
|
||||
record.SetAppSetting("Keycloak:Realm", $"clarity-{job.Subdomain.ToLowerInvariant()}");
|
||||
record.SetAppSetting("Keycloak:BaseUrl", infra.KeycloakPublicUrl);
|
||||
record.SetAppSetting("Keycloak:InternalUrl", infra.KeycloakInternalUrl);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(sagaContext.TenantStackName))
|
||||
record.SetAppSetting("Pulumi:StackName", sagaContext.TenantStackName);
|
||||
|
||||
// ConnectionStrings — written by MigrationStep once DB is provisioned
|
||||
if (!string.IsNullOrWhiteSpace(sagaContext.TenantConnectionString))
|
||||
record.SetConnectionString("TenantDb", sagaContext.TenantConnectionString);
|
||||
|
||||
registry.Save(record);
|
||||
|
||||
logger.LogInformation("[{JobId}] Provisioning completed. Tenant record saved.", job.Id);
|
||||
}
|
||||
|
||||
private async Task CompensateAsync(SagaContext sagaContext, Stack<ISagaStep> executedSteps, CancellationToken cancellationToken)
|
||||
{
|
||||
while (executedSteps.TryPop(out var step))
|
||||
{
|
||||
try
|
||||
{
|
||||
logger.LogInformation("[{JobId}] Compensating: {Step}", sagaContext.Job.Id, step.StepName);
|
||||
await Publish(sagaContext.Job.Id, "compensation_started", step.StepName, $"Rolling back: {step.StepName}");
|
||||
await step.CompensateAsync(sagaContext, cancellationToken);
|
||||
await Publish(sagaContext.Job.Id, "compensation_complete", step.StepName, $"Rolled back: {step.StepName}");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "[{JobId}] Compensation failed for {Step} - manual intervention required", sagaContext.Job.Id, step.StepName);
|
||||
await PublishDiagnostic(sagaContext.Job.Id, $"{step.StepName} (compensation)", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Task Publish(Guid jobId, string type, string? step, string? message) =>
|
||||
bus.Publish(new ProvisioningProgressEvent
|
||||
{
|
||||
JobId = jobId,
|
||||
Type = type,
|
||||
Step = step,
|
||||
Message = message
|
||||
});
|
||||
|
||||
private Task PublishDiagnostic(Guid jobId, string? step, Exception ex) =>
|
||||
bus.Publish(new ProvisioningProgressEvent
|
||||
{
|
||||
JobId = jobId,
|
||||
Type = "diagnostic",
|
||||
Step = step,
|
||||
Message = ex.Message,
|
||||
Detail = ex.ToString()
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user