import { Pool, PoolClient } from 'pg'; import { FdiskMember, FdiskAusbildung, FdiskBefoerderung, FdiskUntersuchung, FdiskFahrgenehmigung, } from './types'; function log(msg: string) { console.log(`[db] ${new Date().toISOString()} ${msg}`); } /** * Map FDISK Dienstgrad (abbreviation or full name) to the DB enum value. * Returns null if no match found — the field will be left unchanged. */ function mapDienstgrad(raw: string): string | null { const abbrevMap: Record = { 'jfm': 'Jugendfeuerwehrmann', 'pfm': 'Probefeuerwehrmann', 'fm': 'Feuerwehrmann', 'ofm': 'Oberfeuerwehrmann', 'hfm': 'Hauptfeuerwehrmann', 'lm': 'Löschmeister', 'olm': 'Oberlöschmeister', 'hlm': 'Hauptlöschmeister', 'bm': 'Brandmeister', 'obm': 'Oberbrandmeister', 'hbm': 'Hauptbrandmeister', 'bi': 'Brandinspektor', 'obi': 'Oberbrandinspektor', 'vm': 'Verwaltungsmeister', 'ovm': 'Oberverwaltungsmeister', 'hvm': 'Hauptverwaltungsmeister', 'v': 'Verwalter', 'sb': 'Sachbearbeiter', 'asb': 'Abschnittssachbearbeiter', }; const normalized = raw.trim().toLowerCase().replace(/\*/g, ''); // Direct abbreviation match if (abbrevMap[normalized]) return abbrevMap[normalized]; // Ehrendienstgrad: starts with 'e', rest maps to a known abbreviation // e.g. EOLM → Ehren-Oberlöschmeister, EVM → Ehren-Verwaltungsmeister if (normalized.startsWith('e') && normalized.length > 1) { const base = abbrevMap[normalized.slice(1)]; if (base) return `Ehren-${base}`; } // Full name pass-through (case-insensitive) const allValues = Object.values(abbrevMap); const match = allValues.find(v => v.toLowerCase() === normalized); if (match) return match; // Also match Ehren- full names const ehrenMatch = allValues.find(v => `ehren-${v.toLowerCase()}` === normalized); if (ehrenMatch) return `Ehren-${ehrenMatch}`; return null; } /** * Valid Austrian/EU driving license class patterns. * Filters out non-class data that the scraper may pick up from FDISK form fields. */ const VALID_LICENSE_CLASSES = new Set([ 'A', 'A1', 'A2', 'AM', 'B', 'B1', 'BE', 'C', 'C1', 'CE', 'C1E', 'D', 'D1', 'DE', 'D1E', 'F', 'G', 'L', 'T', ]); function isValidLicenseClass(klasse: string): boolean { const normalized = klasse.trim().toUpperCase(); return VALID_LICENSE_CLASSES.has(normalized); } export async function syncToDatabase( pool: Pool, members: FdiskMember[], ausbildungen: FdiskAusbildung[], befoerderungen: FdiskBefoerderung[], untersuchungen: FdiskUntersuchung[], fahrgenehmigungen: FdiskFahrgenehmigung[], force = false ): Promise { const client = await pool.connect(); try { await client.query('BEGIN'); let updated = 0; let unchanged = 0; let forced = 0; let created = 0; let skipped = 0; for (const member of members) { // Find the matching mitglieder_profile by fdisk_standesbuch_nr first, // then fall back to matching by name (given_name + family_name) const profileResult = await client.query<{ user_id: string }>( `SELECT mp.user_id FROM mitglieder_profile mp JOIN users u ON u.id = mp.user_id WHERE mp.fdisk_standesbuch_nr = $1`, [member.standesbuchNr] ); let userId: string | null = null; if (profileResult.rows.length > 0) { userId = profileResult.rows[0].user_id; } else { // Fallback: match by name (case-insensitive), only logged-in users const nameResult = await client.query<{ id: string }>( `SELECT u.id FROM users u JOIN mitglieder_profile mp ON mp.user_id = u.id WHERE LOWER(u.given_name) = LOWER($1) AND LOWER(u.family_name) = LOWER($2)`, [member.vorname, member.zuname] ); if (nameResult.rows.length > 1) { log(`WARN: skipping ${member.vorname} ${member.zuname} (Standesbuch-Nr ${member.standesbuchNr}) — duplicate name match (${nameResult.rows.length} users)`); } else if (nameResult.rows.length === 1) { userId = nameResult.rows[0].id; // Store the Standesbuch-Nr now that we found a match await client.query( `UPDATE mitglieder_profile SET fdisk_standesbuch_nr = $1 WHERE user_id = $2`, [member.standesbuchNr, userId] ); log(`Linked ${member.vorname} ${member.zuname} → Standesbuch-Nr ${member.standesbuchNr}`); } } if (!userId) { // No matching user found — create a new dashboard user pre-seeded from FDISK const insertResult = await client.query<{ id: string }>( `INSERT INTO users (authentik_sub, email, name, given_name, family_name, is_active, sync_source) VALUES ($1, $2, $3, $4, $5, true, 'fdisk') ON CONFLICT (authentik_sub) DO NOTHING RETURNING id`, [ `fdisk:${member.standesbuchNr}`, `fdisk_sync_${member.standesbuchNr}@intern.noreply`, `${member.vorname} ${member.zuname}`, member.vorname, member.zuname, ] ); if (insertResult.rows.length > 0) { userId = insertResult.rows[0].id; } else { // ON CONFLICT hit — user already existed (idempotent re-run); fetch it const existingResult = await client.query<{ id: string }>( `SELECT id FROM users WHERE authentik_sub = $1`, [`fdisk:${member.standesbuchNr}`] ); userId = existingResult.rows[0]?.id ?? null; } if (userId) { await client.query( `INSERT INTO mitglieder_profile (user_id) VALUES ($1) ON CONFLICT (user_id) DO NOTHING`, [userId] ); log(`Created ${member.vorname} ${member.zuname} (${member.standesbuchNr}): new FDISK-only user`); created++; } else { log(`WARN: could not create user for ${member.vorname} ${member.zuname} (${member.standesbuchNr})`); skipped++; continue; } } // Fetch current values to detect what actually changed const currentResult = await client.query<{ status: string; dienstgrad: string | null; eintrittsdatum: string | null; austrittsdatum: string | null; geburtsdatum: string | null; }>( `SELECT status, dienstgrad, to_char(eintrittsdatum, 'YYYY-MM-DD') AS eintrittsdatum, to_char(austrittsdatum, 'YYYY-MM-DD') AS austrittsdatum, to_char(geburtsdatum, 'YYYY-MM-DD') AS geburtsdatum FROM mitglieder_profile WHERE user_id = $1`, [userId] ); const cur = currentResult.rows[0]; // Update mitglieder_profile with FDISK data (core + extended profile fields) const dienstgrad = mapDienstgrad(member.dienstgrad); await client.query( `UPDATE mitglieder_profile SET fdisk_standesbuch_nr = $1, status = $2, eintrittsdatum = COALESCE($3::date, eintrittsdatum), austrittsdatum = $4::date, geburtsdatum = COALESCE($5::date, geburtsdatum), geburtsort = COALESCE($6, geburtsort), geschlecht = COALESCE($7, geschlecht), beruf = COALESCE($8, beruf), wohnort = COALESCE($9, wohnort), plz = COALESCE($10, plz), ${dienstgrad ? 'dienstgrad = $11,' : ''} updated_at = NOW() WHERE user_id = ${dienstgrad ? '$12' : '$11'}`, dienstgrad ? [member.standesbuchNr, member.status, member.eintrittsdatum, member.abmeldedatum, member.geburtsdatum, member.geburtsort, member.geschlecht, member.beruf, member.wohnort, member.plz, dienstgrad, userId] : [member.standesbuchNr, member.status, member.eintrittsdatum, member.abmeldedatum, member.geburtsdatum, member.geburtsort, member.geschlecht, member.beruf, member.wohnort, member.plz, userId] ); // Detect and log what changed const changes: string[] = []; if (cur) { if (cur.status !== member.status) changes.push(`Status ${cur.status}→${member.status}`); if (dienstgrad && cur.dienstgrad !== dienstgrad) changes.push(`Dienstgrad ${cur.dienstgrad ?? '—'}→${dienstgrad}`); if (member.eintrittsdatum && cur.eintrittsdatum !== member.eintrittsdatum) changes.push(`Eintrittsdatum ${cur.eintrittsdatum ?? '—'}→${member.eintrittsdatum}`); if (cur.austrittsdatum !== (member.abmeldedatum ?? null)) changes.push(`Austrittsdatum ${cur.austrittsdatum ?? '—'}→${member.abmeldedatum ?? '—'}`); if (member.geburtsdatum && cur.geburtsdatum !== member.geburtsdatum) changes.push(`Geburtsdatum ${cur.geburtsdatum ?? '—'}→${member.geburtsdatum}`); } if (changes.length > 0) { log(`Updated ${member.vorname} ${member.zuname} (${member.standesbuchNr}): ${changes.join(', ')}`); updated++; } else if (force) { // Force mode: explicitly update timestamp and log even unchanged rows await client.query( `UPDATE mitglieder_profile SET updated_at = NOW() WHERE user_id = $1`, [userId] ); log(`Forced ${member.vorname} ${member.zuname} (${member.standesbuchNr}): ${member.status}, ${dienstgrad ?? member.dienstgrad}, Eintritt ${member.eintrittsdatum ?? '—'}`); forced++; } else { log(`OK ${member.vorname} ${member.zuname} (${member.standesbuchNr}): ${member.status}, ${dienstgrad ?? member.dienstgrad}, Eintritt ${member.eintrittsdatum ?? '—'}`); unchanged++; } } log(`Members: ${updated} changed, ${unchanged} unchanged, ${forced} forced, ${created} created, ${skipped} skipped`); // Build StNr → userId lookup map (single query instead of per-record lookups) const lookupResult = await client.query<{ fdisk_standesbuch_nr: string; user_id: string }>( `SELECT fdisk_standesbuch_nr, user_id FROM mitglieder_profile WHERE fdisk_standesbuch_nr IS NOT NULL` ); const stNrToUserId = new Map( lookupResult.rows.map(r => [r.fdisk_standesbuch_nr, r.user_id]) ); log(`Lookup map: ${stNrToUserId.size} StNr→userId mappings`); // Upsert Ausbildungen const ausbildungStats = await syncAusbildungen(client, ausbildungen, stNrToUserId); log(`Ausbildungen: ${ausbildungStats.neu} neu, ${ausbildungStats.updated} unverändert, ${ausbildungStats.skipped} übersprungen`); // Upsert Beförderungen const befoerderungStats = await syncBefoerderungen(client, befoerderungen, stNrToUserId); log(`Beförderungen: ${befoerderungStats.neu} neu, ${befoerderungStats.updated} unverändert, ${befoerderungStats.skipped} übersprungen`); // Upsert Untersuchungen const untersuchungStats = await syncUntersuchungen(client, untersuchungen, stNrToUserId); log(`Untersuchungen: ${untersuchungStats.neu} neu, ${untersuchungStats.updated} unverändert, ${untersuchungStats.skipped} übersprungen`); // Upsert Fahrgenehmigungen const fahrgenStats = await syncFahrgenehmigungen(client, fahrgenehmigungen, stNrToUserId); log(`Fahrgenehmigungen: ${fahrgenStats.neu} neu, ${fahrgenStats.updated} unverändert, ${fahrgenStats.skipped} übersprungen`); await client.query('COMMIT'); } catch (err) { await client.query('ROLLBACK'); throw err; } finally { client.release(); } } /** * Scans the ausbildung table for AT20 courses with erfolgscode = 'mit Erfolg' * and upserts atemschutz_traeger records accordingly. * Must run AFTER syncToDatabase() has committed — i.e. on the same pool, outside the transaction. */ export async function syncAT20ToAtemschutz(pool: Pool): Promise { // First, log a sample of what's actually stored so we can verify the filter strings match. const sample = await pool.query<{ kurs_kurzbezeichnung: string | null; erfolgscode: string | null; count: string }>( `SELECT kurs_kurzbezeichnung, erfolgscode, COUNT(*)::text AS count FROM ausbildung WHERE kurs_kurzbezeichnung IS NOT NULL GROUP BY kurs_kurzbezeichnung, erfolgscode ORDER BY count DESC LIMIT 20` ); log(`AT20-Sync: kurs_kurzbezeichnung/erfolgscode distribution (top 20):`); for (const row of sample.rows) { log(` kurzbezeichnung=${JSON.stringify(row.kurs_kurzbezeichnung)} erfolgscode=${JSON.stringify(row.erfolgscode)} count=${row.count}`); } const result = await pool.query<{ rowCount: number }>( `INSERT INTO atemschutz_traeger (id, user_id, atemschutz_lehrgang, lehrgang_datum) SELECT uuid_generate_v4(), a.user_id, true, MIN(a.kurs_datum) FROM ausbildung a WHERE TRIM(a.kurs_kurzbezeichnung) = 'AT20' AND TRIM(a.erfolgscode) = 'mit Erfolg' GROUP BY a.user_id ON CONFLICT (user_id) DO UPDATE SET atemschutz_lehrgang = true, lehrgang_datum = COALESCE(atemschutz_traeger.lehrgang_datum, EXCLUDED.lehrgang_datum), updated_at = NOW()` ); log(`AT20-Sync: ${result.rowCount ?? 0} atemschutz_traeger rows upserted`); } async function syncAusbildungen( client: PoolClient, ausbildungen: FdiskAusbildung[], stNrToUserId: Map ): Promise<{ neu: number; updated: number; skipped: number }> { let neu = 0, updated = 0, skipped = 0; for (const ausb of ausbildungen) { const userId = stNrToUserId.get(ausb.standesbuchNr); if (!userId) { skipped++; continue; } const upsertResult = await client.query<{ was_inserted: boolean }>( `INSERT INTO ausbildung (user_id, kursname, kursnummer, kurs_kurzbezeichnung, erfolgscode, kurs_datum, ablaufdatum, ort, bemerkung, fdisk_sync_key) VALUES ($1, $2, $3, $4, $5, $6::date, $7::date, $8, $9, $10) ON CONFLICT (user_id, fdisk_sync_key) DO UPDATE SET kursname = EXCLUDED.kursname, kursnummer = EXCLUDED.kursnummer, kurs_kurzbezeichnung = EXCLUDED.kurs_kurzbezeichnung, erfolgscode = EXCLUDED.erfolgscode, kurs_datum = EXCLUDED.kurs_datum, ablaufdatum = EXCLUDED.ablaufdatum, ort = EXCLUDED.ort, bemerkung = EXCLUDED.bemerkung, updated_at = NOW() RETURNING (xmax = 0) AS was_inserted`, [userId, ausb.kursname, ausb.kursnummer, ausb.kurzbezeichnung, ausb.erfolgscode, ausb.kursDatum, ausb.ablaufdatum, ausb.ort, ausb.bemerkung, ausb.syncKey] ); if (upsertResult.rows[0]?.was_inserted) { log(`New Ausbildung: ${ausb.standesbuchNr} — ${ausb.kursname}${ausb.kursDatum ? ` (${ausb.kursDatum})` : ''}`); neu++; } else { updated++; } } return { neu, updated, skipped }; } async function syncBefoerderungen( client: PoolClient, befoerderungen: FdiskBefoerderung[], stNrToUserId: Map ): Promise<{ neu: number; updated: number; skipped: number }> { let neu = 0, updated = 0, skipped = 0; for (const b of befoerderungen) { const userId = stNrToUserId.get(b.standesbuchNr); if (!userId) { skipped++; continue; } const upsertResult = await client.query<{ was_inserted: boolean }>( `INSERT INTO befoerderungen (user_id, datum, dienstgrad, fdisk_sync_key) VALUES ($1, $2::date, $3, $4) ON CONFLICT (user_id, fdisk_sync_key) DO UPDATE SET datum = EXCLUDED.datum, dienstgrad = EXCLUDED.dienstgrad, updated_at = NOW() RETURNING (xmax = 0) AS was_inserted`, [userId, b.datum, b.dienstgrad, b.syncKey] ); if (upsertResult.rows[0]?.was_inserted) { log(`New Beförderung: ${b.standesbuchNr} — ${b.dienstgrad}${b.datum ? ` (${b.datum})` : ''}`); neu++; } else { updated++; } } return { neu, updated, skipped }; } async function syncUntersuchungen( client: PoolClient, untersuchungen: FdiskUntersuchung[], stNrToUserId: Map ): Promise<{ neu: number; updated: number; skipped: number }> { let neu = 0, updated = 0, skipped = 0; for (const u of untersuchungen) { const userId = stNrToUserId.get(u.standesbuchNr); if (!userId) { skipped++; continue; } const upsertResult = await client.query<{ was_inserted: boolean }>( `INSERT INTO untersuchungen (user_id, datum, anmerkungen, art, ergebnis, fdisk_sync_key) VALUES ($1, $2::date, $3, $4, $5, $6) ON CONFLICT (user_id, fdisk_sync_key) DO UPDATE SET datum = EXCLUDED.datum, anmerkungen = EXCLUDED.anmerkungen, art = EXCLUDED.art, ergebnis = EXCLUDED.ergebnis, updated_at = NOW() RETURNING (xmax = 0) AS was_inserted`, [userId, u.datum, u.anmerkungen, u.art, u.ergebnis, u.syncKey] ); if (upsertResult.rows[0]?.was_inserted) { log(`New Untersuchung: ${u.standesbuchNr} — [${u.art}] ${u.ergebnis ?? '—'}${u.datum ? ` (${u.datum})` : ''} | ${u.anmerkungen ?? ''}`); neu++; } else { updated++; } } return { neu, updated, skipped }; } async function syncFahrgenehmigungen( client: PoolClient, fahrgenehmigungen: FdiskFahrgenehmigung[], stNrToUserId: Map ): Promise<{ neu: number; updated: number; skipped: number }> { let neu = 0, updated = 0, skipped = 0; // One-time cleanup: remove wrongly-stored records from broken parsing // Includes klasse='Ausstellungsdatum' and any klasse that looks like a date (DD.MM.YYYY) const cleaned = await client.query( `DELETE FROM fahrgenehmigungen WHERE klasse = 'Ausstellungsdatum' OR klasse ~ '^\\d{2}\\.\\d{2}\\.\\d{4}$'` ); if (cleaned.rowCount && cleaned.rowCount > 0) { log(`Cleaned up ${cleaned.rowCount} invalid Fahrgenehmigung records (wrong klasse values)`); } for (const f of fahrgenehmigungen) { // J2: Filter out non-class data that the scraper may pick up if (!f.klasse || !isValidLicenseClass(f.klasse)) { log(`Skipping Fahrgenehmigung: invalid klasse "${f.klasse}" for StNr ${f.standesbuchNr}`); skipped++; continue; } const userId = stNrToUserId.get(f.standesbuchNr); if (!userId) { skipped++; continue; } const upsertResult = await client.query<{ was_inserted: boolean }>( `INSERT INTO fahrgenehmigungen (user_id, ausstellungsdatum, gueltig_bis, behoerde, nummer, klasse, fdisk_sync_key) VALUES ($1, $2::date, $3::date, $4, $5, $6, $7) ON CONFLICT (user_id, fdisk_sync_key) DO UPDATE SET ausstellungsdatum = EXCLUDED.ausstellungsdatum, gueltig_bis = EXCLUDED.gueltig_bis, behoerde = EXCLUDED.behoerde, nummer = EXCLUDED.nummer, klasse = EXCLUDED.klasse, updated_at = NOW() RETURNING (xmax = 0) AS was_inserted`, [userId, f.ausstellungsdatum, f.gueltigBis, f.behoerde, f.nummer, f.klasse, f.syncKey] ); if (upsertResult.rows[0]?.was_inserted) { log(`New Fahrgenehmigung: ${f.standesbuchNr} — [${f.klasse}]${f.ausstellungsdatum ? ` (${f.ausstellungsdatum})` : ''}`); neu++; } else { updated++; } } return { neu, updated, skipped }; }