= Напредна тема: Пропагација на податоци помеѓу централна и локална база = == Опис на напредната тема == Како напредна тема во рамки на проектот '''SCS - Smart City Security''' е имплементиран механизам за пропагација на податоци помеѓу централна и локална база на податоци. Идејата на оваа имплементација е да се симулира Smart City Security систем во кој постои централна база, која ги содржи главните податоци за системот, и локална база, која добива копија од релевантните податоци. Во ваков систем централната база може да се користи на ниво на град или институција, додека локалната база може да припаѓа на одредена полициска станица, зона или организациски дел од системот. Целта на пропагацијата е при внесување нови податоци во централната база, тие контролирано да се префрлат и во локалната база. == Користени табели == За демонстрација на пропагацијата се користат следните табели: * '''gragjanin''' * '''vozilo''' * '''kamera''' * '''prekrsok''' * '''kazna''' Во централната база дополнително е креирана табелата: * '''propagation_log''' Оваа табела служи за евиденција на сите промени што треба да се префрлат од централната во локалната база. == Табела propagation_log == Табелата '''propagation_log''' претставува лог табела во која се запишуваат сите нови записи што треба да се пропагираат. Секој запис во оваа табела содржи информации за: * табелата во која е направена промената * типот на операција * идентификаторот на записот * payload со вредностите на записот * статус на обработка * датум на креирање * датум на обработка * порака за грешка, доколку пропагацијата не успее {{{ CREATE TABLE propagation_log ( log_id SERIAL PRIMARY KEY, tabela VARCHAR(50) NOT NULL, operacija VARCHAR(20) NOT NULL, zapis_id VARCHAR(50) NOT NULL, payload JSONB NOT NULL, status VARCHAR(20) DEFAULT 'PENDING', datum_kreiranje TIMESTAMP DEFAULT CURRENT_TIMESTAMP, datum_obrabotka TIMESTAMP, poraka_greska TEXT ); }}} Статусот може да има една од следните вредности: * '''PENDING''' - записот чека да биде префрлен во локалната база * '''DONE''' - записот е успешно префрлен * '''FAILED''' - настанала грешка при префрлањето == Функција за логирање на промени == За евидентирање на новите записи е креирана функцијата: {{{ fn_log_propagation_insert() }}} Оваа функција се повикува преку trigger-и. Таа го зема новиот внесен ред преку '''NEW''', го претвора во JSONB формат и го запишува во '''propagation_log'''. {{{ CREATE OR REPLACE FUNCTION fn_log_propagation_insert() RETURNS TRIGGER AS $$ DECLARE pk_column TEXT; pk_value TEXT; BEGIN pk_column := TG_ARGV[0]; pk_value := to_jsonb(NEW) ->> pk_column; ``` INSERT INTO propagation_log ( tabela, operacija, zapis_id, payload ) VALUES ( TG_TABLE_NAME, 'INSERT', pk_value, to_jsonb(NEW) ); RETURN NEW; ``` END; $$ LANGUAGE plpgsql; }}} Функцијата како аргумент го добива името на primary key колоната за табелата на која се однесува trigger-от. На тој начин истата функција може да се користи за повеќе табели. Користењето на JSONB овозможува целиот запис да се зачува како payload, без потреба да се креира посебна log табела за секоја табела. == Trigger-и == Во централната база се креирани trigger-и за табелите: * '''gragjanin''' * '''vozilo''' * '''kamera''' * '''prekrsok''' * '''kazna''' Trigger-ите се активираат по секој INSERT во соодветната табела. Тие не ги префрлаат директно податоците во локалната база, туку автоматски додаваат запис во '''propagation_log''' со статус '''PENDING'''. {{{ DROP TRIGGER IF EXISTS trg_log_gragjanin_insert ON gragjanin; DROP TRIGGER IF EXISTS trg_log_vozilo_insert ON vozilo; DROP TRIGGER IF EXISTS trg_log_kamera_insert ON kamera; DROP TRIGGER IF EXISTS trg_log_prekrsok_insert ON prekrsok; DROP TRIGGER IF EXISTS trg_log_kazna_insert ON kazna; CREATE TRIGGER trg_log_gragjanin_insert AFTER INSERT ON gragjanin FOR EACH ROW EXECUTE FUNCTION fn_log_propagation_insert('embg'); CREATE TRIGGER trg_log_vozilo_insert AFTER INSERT ON vozilo FOR EACH ROW EXECUTE FUNCTION fn_log_propagation_insert('registarska_oznaka'); CREATE TRIGGER trg_log_kamera_insert AFTER INSERT ON kamera FOR EACH ROW EXECUTE FUNCTION fn_log_propagation_insert('kamera_id'); CREATE TRIGGER trg_log_prekrsok_insert AFTER INSERT ON prekrsok FOR EACH ROW EXECUTE FUNCTION fn_log_propagation_insert('prekrsok_id'); CREATE TRIGGER trg_log_kazna_insert AFTER INSERT ON kazna FOR EACH ROW EXECUTE FUNCTION fn_log_propagation_insert('kazna_id'); }}} Со ова при секој нов INSERT во некоја од наведените табели автоматски се креира запис во '''propagation_log'''. == Процедура за пропагација == За префрлање на податоците од централната во локалната база е креирана процедурата: {{{ sp_propagiraj_vo_lokalna_baza() }}} Процедурата ги зема сите записи од '''propagation_log''' со статус '''PENDING'''. За секој запис проверува од која табела доаѓа, го чита payload-от и го внесува соодветниот запис во локалната база. Ако внесувањето е успешно, статусот во '''propagation_log''' се менува во '''DONE'''. Ако настане грешка, статусот се менува во '''FAILED''' и се запишува пораката за грешка. {{{ CREATE OR REPLACE PROCEDURE sp_propagiraj_vo_lokalna_baza() LANGUAGE plpgsql AS $$ DECLARE r RECORD; p JSONB; conn TEXT := 'host=127.0.0.1 port=5430 dbname=scs_local user=postgres password=postgres'; BEGIN FOR r IN SELECT * FROM propagation_log WHERE status = 'PENDING' ORDER BY log_id LOOP BEGIN p := r.payload; ``` IF r.tabela = 'gragjanin' THEN PERFORM dblink_exec( conn, format( 'INSERT INTO gragjanin (embg, ime, prezime, adresa, grad, telefonski_broj, datum_ragjanje, pol) VALUES (%L, %L, %L, %L, %L, %L, %L, %L) ON CONFLICT (embg) DO NOTHING', p ->> 'embg', p ->> 'ime', p ->> 'prezime', p ->> 'adresa', p ->> 'grad', p ->> 'telefonski_broj', p ->> 'datum_ragjanje', p ->> 'pol' ) ); ELSIF r.tabela = 'vozilo' THEN PERFORM dblink_exec( conn, format( 'INSERT INTO vozilo (registarska_oznaka, marka, model, boja, embg_sopstvenik) VALUES (%L, %L, %L, %L, %L) ON CONFLICT (registarska_oznaka) DO NOTHING', p ->> 'registarska_oznaka', p ->> 'marka', p ->> 'model', p ->> 'boja', p ->> 'embg_sopstvenik' ) ); ELSIF r.tabela = 'kamera' THEN PERFORM dblink_exec( conn, format( 'INSERT INTO kamera (kamera_id, lokacija, aktivna) VALUES (%L, %L, %L) ON CONFLICT (kamera_id) DO NOTHING', p ->> 'kamera_id', p ->> 'lokacija', p ->> 'aktivna' ) ); ELSIF r.tabela = 'prekrsok' THEN PERFORM dblink_exec( conn, format( 'INSERT INTO prekrsok (prekrsok_id, datum, opis, kamera_id, registarska_oznaka, embg_storitel) VALUES (%L, %L, %L, %L, %L, %L) ON CONFLICT (prekrsok_id) DO NOTHING', p ->> 'prekrsok_id', p ->> 'datum', p ->> 'opis', p ->> 'kamera_id', p ->> 'registarska_oznaka', p ->> 'embg_storitel' ) ); ELSIF r.tabela = 'kazna' THEN PERFORM dblink_exec( conn, format( 'INSERT INTO kazna (kazna_id, prekrsok_id, datum, iznos_za_plakanje, rok_na_plakanje, status) VALUES (%L, %L, %L, %L, %L, %L) ON CONFLICT (kazna_id) DO NOTHING', p ->> 'kazna_id', p ->> 'prekrsok_id', p ->> 'datum', p ->> 'iznos_za_plakanje', p ->> 'rok_na_plakanje', p ->> 'status' ) ); END IF; UPDATE propagation_log SET status = 'DONE', datum_obrabotka = CURRENT_TIMESTAMP, poraka_greska = NULL WHERE log_id = r.log_id; EXCEPTION WHEN OTHERS THEN UPDATE propagation_log SET status = 'FAILED', datum_obrabotka = CURRENT_TIMESTAMP, poraka_greska = SQLERRM WHERE log_id = r.log_id; END; END LOOP; ``` END; $$; }}} == Користење на dblink == Бидејќи централната и локалната база се две одделни PostgreSQL бази, за комуникација помеѓу нив е користен PostgreSQL extension: {{{ dblink }}} Овој extension овозможува од една база да се извршуваат SQL команди кон друга база. {{{ CREATE EXTENSION IF NOT EXISTS dblink; }}} Во процедурата '''sp_propagiraj_vo_lokalna_baza()''' преку '''dblink_exec''' се извршуваат INSERT команди кон локалната база. == Тек на пропагацијата == Целиот процес се одвива во следните чекори: 1. Се внесува нов запис во некоја од табелите во централната база. 2. Trigger автоматски се активира по INSERT операцијата. 3. Trigger-от ја повикува функцијата '''fn_log_propagation_insert()'''. 4. Во '''propagation_log''' се додава запис со статус '''PENDING'''. 5. Се повикува процедурата '''sp_propagiraj_vo_lokalna_baza()'''. 6. Процедурата ги префрла податоците во локалната база. 7. Статусот во '''propagation_log''' се менува во '''DONE'''. == Пример за проверка == За проверка на записите во лог табелата се користи следното барање: {{{ SELECT log_id, tabela, operacija, zapis_id, status, datum_kreiranje, datum_obrabotka, poraka_greska FROM propagation_log ORDER BY log_id; }}} По успешна пропагација, записите во колоната '''status''' треба да имаат вредност '''DONE'''. За проверка дека податоците се префрлени во локалната база се прави SELECT од соодветните табели: {{{ SELECT * FROM gragjanin; SELECT * FROM vozilo; SELECT * FROM kamera; SELECT * FROM prekrsok; SELECT * FROM kazna; }}} == Заклучок == Со оваа напредна тема е имплементиран механизам за пропагација на податоци помеѓу централна и локална база. Решението користи trigger-и, log табела, stored procedure и dblink extension. Овој пристап е соодветен за Smart City Security систем бидејќи овозможува централно внесување на податоци и нивна контролирана синхронизација кон локални бази, што е применливо во системи каде различни институции или зони треба да имаат пристап до релевантни податоци.