/** * Notification Generation Job * * Runs every 15 minutes and generates persistent notifications for: * 1. Personal atemschutz warnings (untersuchung / leistungstest expiring within 60 days) * 2. Vehicle issues (for fahrmeister users) * 3. Equipment issues (for fahrmeister if motorised, zeugmeister if not) * 4. Nextcloud Talk unread messages * * Deduplicates via the unique index on (user_id, quell_typ, quell_id) WHERE NOT gelesen. * Also cleans up read notifications older than 90 days. */ import pool from '../config/database'; import notificationService from '../services/notification.service'; import nextcloudService from '../services/nextcloud.service'; import logger from '../utils/logger'; const INTERVAL_MS = 15 * 60 * 1000; // 15 minutes const NEXTCLOUD_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes — keep reasonable to avoid rate-limiting const NEXTCLOUD_BATCH_DELAY_MS = 300; // ms between user batches to avoid hammering Nextcloud const STARTUP_DELAY_MS = 30 * 1000; // 30 seconds — avoid burst on container restart const ATEMSCHUTZ_THRESHOLD = 60; // days let jobInterval: ReturnType | null = null; let nextcloudInterval: ReturnType | null = null; let isRunning = false; let isNextcloudRunning = false; // --------------------------------------------------------------------------- // Core generation function // --------------------------------------------------------------------------- export async function runNotificationGeneration(): Promise { if (isRunning) { logger.warn('NotificationGenerationJob: previous run still in progress — skipping'); return; } isRunning = true; try { await generateAtemschutzNotifications(); await generateVehicleNotifications(); await generateEquipmentNotifications(); await notificationService.deleteOldRead(); } catch (error) { logger.error('NotificationGenerationJob: unexpected error', { error: error instanceof Error ? error.message : String(error), }); } finally { isRunning = false; } } async function runNextcloudNotificationGeneration(): Promise { if (isNextcloudRunning) { logger.warn('NotificationGenerationJob: Nextcloud run still in progress — skipping'); return; } isNextcloudRunning = true; try { await generateNextcloudTalkNotifications(); } catch (error) { logger.error('NotificationGenerationJob: Nextcloud unexpected error', { error: error instanceof Error ? error.message : String(error), }); } finally { isNextcloudRunning = false; } } // --------------------------------------------------------------------------- // 1. Atemschutz personal warnings // --------------------------------------------------------------------------- async function generateAtemschutzNotifications(): Promise { try { // Get all atemschutz records with expiring dates const result = await pool.query(` SELECT au.user_id, au.untersuchung_tage_rest, au.leistungstest_tage_rest, u.name AS user_name FROM atemschutz_uebersicht au JOIN users u ON u.id = au.user_id WHERE au.user_id IS NOT NULL `); for (const row of result.rows) { const userId = row.user_id; const untTage = row.untersuchung_tage_rest != null ? parseInt(row.untersuchung_tage_rest, 10) : null; const leiTage = row.leistungstest_tage_rest != null ? parseInt(row.leistungstest_tage_rest, 10) : null; if (untTage !== null && untTage <= ATEMSCHUTZ_THRESHOLD) { const schwere = untTage < 0 ? 'fehler' : untTage <= 14 ? 'warnung' : 'info'; await notificationService.createNotification({ user_id: userId, typ: 'atemschutz_untersuchung', titel: 'Atemschutz-Untersuchung fällig', nachricht: untTage < 0 ? `Deine Atemschutz-Untersuchung ist seit ${Math.abs(untTage)} Tagen überfällig.` : `Deine Atemschutz-Untersuchung ist in ${untTage} Tagen fällig.`, schwere: schwere as any, link: '/atemschutz', quell_id: `atemschutz-untersuchung-${userId}`, quell_typ: 'atemschutz_untersuchung', }); } if (leiTage !== null && leiTage <= ATEMSCHUTZ_THRESHOLD) { const schwere = leiTage < 0 ? 'fehler' : leiTage <= 14 ? 'warnung' : 'info'; await notificationService.createNotification({ user_id: userId, typ: 'atemschutz_leistungstest', titel: 'Atemschutz-Leistungstest fällig', nachricht: leiTage < 0 ? `Dein Atemschutz-Leistungstest ist seit ${Math.abs(leiTage)} Tagen überfällig.` : `Dein Atemschutz-Leistungstest ist in ${leiTage} Tagen fällig.`, schwere: schwere as any, link: '/atemschutz', quell_id: `atemschutz-leistungstest-${userId}`, quell_typ: 'atemschutz_leistungstest', }); } } } catch (error) { logger.error('NotificationGenerationJob: generateAtemschutzNotifications failed', { error }); } } // --------------------------------------------------------------------------- // 2. Vehicle issues → fahrmeister users (bulk INSERT) // --------------------------------------------------------------------------- async function generateVehicleNotifications(): Promise { try { await pool.query(` INSERT INTO notifications (user_id, typ, titel, nachricht, schwere, link, quell_id, quell_typ) SELECT u.id, 'fahrzeug_status', 'Fahrzeug nicht einsatzbereit', CASE WHEN f.kurzname IS NOT NULL THEN f.bezeichnung || ' (' || f.kurzname || ') hat den Status "' || f.status || '" und ist nicht einsatzbereit.' ELSE f.bezeichnung || ' hat den Status "' || f.status || '" und ist nicht einsatzbereit.' END, 'fehler', '/fahrzeuge/' || f.id::text, 'fahrzeug-status-' || f.id::text, 'fahrzeug_status' FROM fahrzeuge f CROSS JOIN users u WHERE f.deleted_at IS NULL AND f.status IN ('beschaedigt', 'ausser_dienst') AND u.is_active = TRUE AND 'dashboard_fahrmeister' = ANY(u.authentik_groups) ON CONFLICT (user_id, quell_typ, quell_id) WHERE NOT gelesen AND quell_typ IS NOT NULL AND quell_id IS NOT NULL DO NOTHING `); await pool.query(` INSERT INTO notifications (user_id, typ, titel, nachricht, schwere, link, quell_id, quell_typ) SELECT u.id, 'fahrzeug_pruefung', 'Fahrzeugprüfung überfällig', CASE WHEN f.kurzname IS NOT NULL THEN 'Die Prüfung von ' || f.bezeichnung || ' (' || f.kurzname || ') ist seit ' || ABS(f.naechste_pruefung_tage::int) || ' Tagen überfällig.' ELSE 'Die Prüfung von ' || f.bezeichnung || ' ist seit ' || ABS(f.naechste_pruefung_tage::int) || ' Tagen überfällig.' END, 'fehler', '/fahrzeuge/' || f.id::text, 'fahrzeug-pruefung-' || f.id::text, 'fahrzeug_pruefung' FROM fahrzeuge f CROSS JOIN users u WHERE f.deleted_at IS NULL AND f.naechste_pruefung_tage IS NOT NULL AND f.naechste_pruefung_tage::int < 0 AND u.is_active = TRUE AND 'dashboard_fahrmeister' = ANY(u.authentik_groups) ON CONFLICT (user_id, quell_typ, quell_id) WHERE NOT gelesen AND quell_typ IS NOT NULL AND quell_id IS NOT NULL DO NOTHING `); } catch (error) { logger.error('NotificationGenerationJob: generateVehicleNotifications failed', { error }); } } // --------------------------------------------------------------------------- // 3. Equipment issues → fahrmeister (motorised) or zeugmeister (bulk INSERT) // --------------------------------------------------------------------------- async function generateEquipmentNotifications(): Promise { try { await pool.query(` INSERT INTO notifications (user_id, typ, titel, nachricht, schwere, link, quell_id, quell_typ) SELECT u.id, 'ausruestung_status', 'Ausrüstung nicht einsatzbereit', a.bezeichnung || ' hat den Status "' || a.status || '" und ist nicht einsatzbereit.', 'fehler', '/ausruestung/' || a.id::text, 'ausruestung-status-' || a.id::text, 'ausruestung_status' FROM ausruestung a JOIN ausruestung_kategorien k ON k.id = a.kategorie_id JOIN users u ON u.is_active = TRUE AND ( (k.motorisiert = TRUE AND 'dashboard_fahrmeister' = ANY(u.authentik_groups)) OR (k.motorisiert = FALSE AND 'dashboard_zeugmeister' = ANY(u.authentik_groups)) ) WHERE a.deleted_at IS NULL AND a.status IN ('beschaedigt', 'ausser_dienst') ON CONFLICT (user_id, quell_typ, quell_id) WHERE NOT gelesen AND quell_typ IS NOT NULL AND quell_id IS NOT NULL DO NOTHING `); await pool.query(` INSERT INTO notifications (user_id, typ, titel, nachricht, schwere, link, quell_id, quell_typ) SELECT u.id, 'ausruestung_pruefung', 'Ausrüstungsprüfung überfällig', 'Die Prüfung von ' || a.bezeichnung || ' ist seit ' || ABS(a.naechste_pruefung_am::date - CURRENT_DATE) || ' Tagen überfällig.', 'fehler', '/ausruestung/' || a.id::text, 'ausruestung-pruefung-' || a.id::text, 'ausruestung_pruefung' FROM ausruestung a JOIN ausruestung_kategorien k ON k.id = a.kategorie_id JOIN users u ON u.is_active = TRUE AND ( (k.motorisiert = TRUE AND 'dashboard_fahrmeister' = ANY(u.authentik_groups)) OR (k.motorisiert = FALSE AND 'dashboard_zeugmeister' = ANY(u.authentik_groups)) ) WHERE a.deleted_at IS NULL AND a.naechste_pruefung_am IS NOT NULL AND a.naechste_pruefung_am::date < CURRENT_DATE ON CONFLICT (user_id, quell_typ, quell_id) WHERE NOT gelesen AND quell_typ IS NOT NULL AND quell_id IS NOT NULL DO NOTHING `); } catch (error) { logger.error('NotificationGenerationJob: generateEquipmentNotifications failed', { error }); } } // --------------------------------------------------------------------------- // 4. Nextcloud Talk unread messages — batched concurrency (3 users at a time) // --------------------------------------------------------------------------- const NEXTCLOUD_BATCH_SIZE = 3; async function generateNextcloudTalkNotifications(): Promise { const usersResult = await pool.query(` SELECT id, nextcloud_login_name, nextcloud_app_password FROM users WHERE is_active = TRUE AND nextcloud_login_name IS NOT NULL AND nextcloud_app_password IS NOT NULL `); const users = usersResult.rows; for (let i = 0; i < users.length; i += NEXTCLOUD_BATCH_SIZE) { if (i > 0) { await new Promise((r) => setTimeout(r, NEXTCLOUD_BATCH_DELAY_MS)); } const batch = users.slice(i, i + NEXTCLOUD_BATCH_SIZE); await Promise.allSettled(batch.map((user) => processNextcloudUser(user))); } } async function processNextcloudUser(user: { id: string; nextcloud_login_name: string; nextcloud_app_password: string }): Promise { try { const { conversations } = await nextcloudService.getConversations( user.nextcloud_login_name, user.nextcloud_app_password, ); for (const conv of conversations) { if (conv.unreadMessages <= 0) continue; await notificationService.createNotification({ user_id: user.id, typ: 'nextcloud_talk', titel: conv.displayName, nachricht: `${conv.unreadMessages} ungelesene Nachrichten`, schwere: 'info', link: conv.url, quell_id: conv.token, quell_typ: 'nextcloud_talk', }); } } catch (error: any) { if (error?.code === 'NEXTCLOUD_AUTH_INVALID') { await pool.query( `UPDATE users SET nextcloud_login_name = NULL, nextcloud_app_password = NULL WHERE id = $1`, [user.id], ); logger.warn('NotificationGenerationJob: cleared invalid Nextcloud credentials', { userId: user.id }); return; } logger.error('NotificationGenerationJob: generateNextcloudTalkNotifications failed for user', { userId: user.id, error, }); } } // --------------------------------------------------------------------------- // Job lifecycle // --------------------------------------------------------------------------- export function startNotificationJob(): void { if (jobInterval !== null) { logger.warn('Notification generation job already running — skipping duplicate start'); return; } // Run main job once on startup, then repeat. runNotificationGeneration(); // Delay initial Nextcloud run to avoid a burst on container restart. setTimeout(() => runNextcloudNotificationGeneration(), STARTUP_DELAY_MS); jobInterval = setInterval(() => { runNotificationGeneration(); }, INTERVAL_MS); nextcloudInterval = setInterval(() => { runNextcloudNotificationGeneration(); }, NEXTCLOUD_INTERVAL_MS); logger.info('Notification generation jobs scheduled (main: 15min, Nextcloud Talk: 2min)'); } export function stopNotificationJob(): void { if (jobInterval !== null) { clearInterval(jobInterval); jobInterval = null; } if (nextcloudInterval !== null) { clearInterval(nextcloudInterval); nextcloudInterval = null; } logger.info('Notification generation jobs stopped'); }