using ControlPlane.Core.Interfaces;
using ControlPlane.Core.Models;
using Npgsql;
namespace ControlPlane.Worker.Steps;
///
/// Provisions a per-tenant Postgres database on the shared Postgres instance.
/// Writes TenantConnectionString to SagaContext for downstream steps (LaunchStep).
/// Compensation drops the database.
///
public class MigrationStep(
IConfiguration config,
ILogger logger) : ISagaStep
{
public string StepName => "Database Migration & Seeding (EF Core)";
public async Task ExecuteAsync(SagaContext context, CancellationToken cancellationToken)
{
var job = context.Job;
var dbName = TenantDbName(job.Subdomain);
var adminConnStr = config.GetConnectionString("platformdb")
?? throw new InvalidOperationException(
"ConnectionStrings:platformdb is missing. " +
"Ensure ControlPlane.Worker appsettings.json has a platformdb connection string.");
logger.LogInformation("[{JobId}] Provisioning database '{Db}'.", job.Id, dbName);
await CreateDatabaseIfNotExistsAsync(adminConnStr, dbName, cancellationToken);
context.TenantConnectionString = BuildTenantConnectionString(adminConnStr, dbName);
logger.LogInformation("[{JobId}] Database '{Db}' ready.", job.Id, dbName);
// TODO: Run EF Core migrations once dynamic DbContext is wired:
// var opts = new DbContextOptionsBuilder().UseNpgsql(context.TenantConnectionString).Options;
// await using var db = new ApplicationDbContext(opts);
// await db.Database.MigrateAsync(cancellationToken);
context.Job.CompletedSteps |= CompletedSteps.DatabaseMigrated;
}
public async Task CompensateAsync(SagaContext context, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(context.TenantConnectionString)) return;
var dbName = TenantDbName(context.Job.Subdomain);
var adminConnStr = config.GetConnectionString("platformdb");
if (string.IsNullOrWhiteSpace(adminConnStr)) return;
logger.LogWarning("[{JobId}] Compensating: dropping database '{Db}'.", context.Job.Id, dbName);
try
{
await using var conn = new NpgsqlConnection(adminConnStr);
await conn.OpenAsync(cancellationToken);
await using var terminate = conn.CreateCommand();
terminate.CommandText = $"""
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = '{dbName}' AND pid <> pg_backend_pid();
""";
await terminate.ExecuteNonQueryAsync(cancellationToken);
await using var drop = conn.CreateCommand();
drop.CommandText = $"DROP DATABASE IF EXISTS \"{dbName}\";";
await drop.ExecuteNonQueryAsync(cancellationToken);
logger.LogInformation("[{JobId}] Dropped database '{Db}'.", context.Job.Id, dbName);
}
catch (Exception ex)
{
logger.LogError(ex, "[{JobId}] Failed to drop database '{Db}' during compensation.", context.Job.Id, dbName);
}
}
// ── helpers ──────────────────────────────────────────────────────────────
// Deterministic DB name from subdomain: fdev-app-clarity-01000014 → clarity_fdev_app_clarity_01000014
internal static string TenantDbName(string subdomain) =>
$"clarity_{subdomain.Replace('-', '_').ToLowerInvariant()}";
private static async Task CreateDatabaseIfNotExistsAsync(
string adminConnStr, string dbName, CancellationToken ct)
{
await using var conn = new NpgsqlConnection(adminConnStr);
await conn.OpenAsync(ct);
await using var check = conn.CreateCommand();
check.CommandText = "SELECT 1 FROM pg_database WHERE datname = $1;";
check.Parameters.AddWithValue(dbName);
var exists = await check.ExecuteScalarAsync(ct) is not null;
if (!exists)
{
await using var create = conn.CreateCommand();
// DB name is internally derived, not user input — safe to interpolate
create.CommandText = $"CREATE DATABASE \"{dbName}\";";
await create.ExecuteNonQueryAsync(ct);
}
}
private static string BuildTenantConnectionString(string adminConnStr, string dbName)
{
var b = new NpgsqlConnectionStringBuilder(adminConnStr) { Database = dbName };
// Tenant containers reach Postgres via the Aspire shared network using the stable
// DNS alias "postgres" (the Aspire resource name) at the standard port 5432.
// The port in the admin connection string is Aspire's random host-side proxy port —
// reset it to 5432 so the in-network address is correct.
if (b.Host is "localhost" or "127.0.0.1")
{
b.Host = "postgres";
b.Port = 5432;
}
return b.ConnectionString;
}
}