wiki:AdvancedTopic

Version 2 (modified by 231018, 10 hours ago) ( diff )

--

Напредна тема: Пропагација на податоци помеѓу централна и локална база

Опис на напредната тема

Како напредна тема во рамки на проектот SCS - Smart City Security е имплементиран механизам за пропагација на податоци помеѓу централна и локална база на податоци.

Идејата на оваа имплементација е да се симулира Smart City Security систем во кој постои централна база, која ги содржи главните податоци за системот, и локална база, која добива копија од релевантните податоци.

Во ваков систем централната база може да се користи на ниво на град или институција, додека локалната база може да припаѓа на одредена полициска станица, зона или организациски дел од системот.

Целта на пропагацијата е при внесување нови податоци во централната база, тие контролирано да се префрлат и во локалната база.

Користени табели

За демонстрација на пропагацијата се користат следните табели:

  • gragjanin
  • vozilo
  • kamera
  • prekrsok
  • kazna

Во централната база дополнително е креирана табелата:

  • propagation_log

Оваа табела служи за евиденција на сите промени што треба да се префрлат од централната во локалната база.

Табела propagation_log

Табелата propagation_log претставува лог табела во која се запишуваат сите нови записи што треба да се пропагираат.

Секој запис во оваа табела содржи информации за:

  • табелата во која е направена промената
  • типот на операција
  • идентификаторот на записот
  • payload со вредностите на записот
  • статус на обработка
  • датум на креирање
  • датум на обработка
  • порака за грешка, доколку пропагацијата не успее
CREATE TABLE propagation_log
(
log_id           SERIAL PRIMARY KEY,
tabela           VARCHAR(50) NOT NULL,
operacija        VARCHAR(20) NOT NULL,
zapis_id         VARCHAR(50) NOT NULL,
payload          JSONB NOT NULL,
status           VARCHAR(20) DEFAULT 'PENDING',
datum_kreiranje  TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
datum_obrabotka  TIMESTAMP,
poraka_greska    TEXT
);

Статусот може да има една од следните вредности:

  • PENDING - записот чека да биде префрлен во локалната база
  • DONE - записот е успешно префрлен
  • FAILED - настанала грешка при префрлањето

Функција за логирање на промени

За евидентирање на новите записи е креирана функцијата:

fn_log_propagation_insert()

Оваа функција се повикува преку trigger-и. Таа го зема новиот внесен ред преку NEW, го претвора во JSONB формат и го запишува во propagation_log.

CREATE OR REPLACE FUNCTION fn_log_propagation_insert()
RETURNS TRIGGER AS $$
DECLARE
pk_column TEXT;
pk_value  TEXT;
BEGIN
pk_column := TG_ARGV[0];
pk_value := to_jsonb(NEW) ->> pk_column;

```
INSERT INTO propagation_log (
    tabela,
    operacija,
    zapis_id,
    payload
)
VALUES (
    TG_TABLE_NAME,
    'INSERT',
    pk_value,
    to_jsonb(NEW)
);

RETURN NEW;
```

END;
$$ LANGUAGE plpgsql;

Функцијата како аргумент го добива името на primary key колоната за табелата на која се однесува trigger-от. На тој начин истата функција може да се користи за повеќе табели.

Користењето на JSONB овозможува целиот запис да се зачува како payload, без потреба да се креира посебна log табела за секоја табела.

Trigger-и

Во централната база се креирани trigger-и за табелите:

  • gragjanin
  • vozilo
  • kamera
  • prekrsok
  • kazna

Trigger-ите се активираат по секој INSERT во соодветната табела. Тие не ги префрлаат директно податоците во локалната база, туку автоматски додаваат запис во propagation_log со статус PENDING.

DROP TRIGGER IF EXISTS trg_log_gragjanin_insert ON gragjanin;
DROP TRIGGER IF EXISTS trg_log_vozilo_insert ON vozilo;
DROP TRIGGER IF EXISTS trg_log_kamera_insert ON kamera;
DROP TRIGGER IF EXISTS trg_log_prekrsok_insert ON prekrsok;
DROP TRIGGER IF EXISTS trg_log_kazna_insert ON kazna;

CREATE TRIGGER trg_log_gragjanin_insert
AFTER INSERT ON gragjanin
FOR EACH ROW
EXECUTE FUNCTION fn_log_propagation_insert('embg');

CREATE TRIGGER trg_log_vozilo_insert
AFTER INSERT ON vozilo
FOR EACH ROW
EXECUTE FUNCTION fn_log_propagation_insert('registarska_oznaka');

CREATE TRIGGER trg_log_kamera_insert
AFTER INSERT ON kamera
FOR EACH ROW
EXECUTE FUNCTION fn_log_propagation_insert('kamera_id');

CREATE TRIGGER trg_log_prekrsok_insert
AFTER INSERT ON prekrsok
FOR EACH ROW
EXECUTE FUNCTION fn_log_propagation_insert('prekrsok_id');

CREATE TRIGGER trg_log_kazna_insert
AFTER INSERT ON kazna
FOR EACH ROW
EXECUTE FUNCTION fn_log_propagation_insert('kazna_id');

Со ова при секој нов INSERT во некоја од наведените табели автоматски се креира запис во propagation_log.

Процедура за пропагација

За префрлање на податоците од централната во локалната база е креирана процедурата:

sp_propagiraj_vo_lokalna_baza()

Процедурата ги зема сите записи од propagation_log со статус PENDING. За секој запис проверува од која табела доаѓа, го чита payload-от и го внесува соодветниот запис во локалната база.

Ако внесувањето е успешно, статусот во propagation_log се менува во DONE. Ако настане грешка, статусот се менува во FAILED и се запишува пораката за грешка.

CREATE OR REPLACE PROCEDURE sp_propagiraj_vo_lokalna_baza()
LANGUAGE plpgsql
AS $$
DECLARE
r RECORD;
p JSONB;
conn TEXT := 'host=127.0.0.1 port=5430 dbname=scs_local user=postgres password=postgres';
BEGIN
FOR r IN
SELECT *
FROM propagation_log
WHERE status = 'PENDING'
ORDER BY log_id
LOOP
BEGIN
p := r.payload;

```
        IF r.tabela = 'gragjanin' THEN

            PERFORM dblink_exec(
                conn,
                format(
                    'INSERT INTO gragjanin
                     (embg, ime, prezime, adresa, grad, telefonski_broj, datum_ragjanje, pol)
                     VALUES (%L, %L, %L, %L, %L, %L, %L, %L)
                     ON CONFLICT (embg) DO NOTHING',
                    p ->> 'embg',
                    p ->> 'ime',
                    p ->> 'prezime',
                    p ->> 'adresa',
                    p ->> 'grad',
                    p ->> 'telefonski_broj',
                    p ->> 'datum_ragjanje',
                    p ->> 'pol'
                )
            );

        ELSIF r.tabela = 'vozilo' THEN

            PERFORM dblink_exec(
                conn,
                format(
                    'INSERT INTO vozilo
                     (registarska_oznaka, marka, model, boja, embg_sopstvenik)
                     VALUES (%L, %L, %L, %L, %L)
                     ON CONFLICT (registarska_oznaka) DO NOTHING',
                    p ->> 'registarska_oznaka',
                    p ->> 'marka',
                    p ->> 'model',
                    p ->> 'boja',
                    p ->> 'embg_sopstvenik'
                )
            );

        ELSIF r.tabela = 'kamera' THEN

            PERFORM dblink_exec(
                conn,
                format(
                    'INSERT INTO kamera
                     (kamera_id, lokacija, aktivna)
                     VALUES (%L, %L, %L)
                     ON CONFLICT (kamera_id) DO NOTHING',
                    p ->> 'kamera_id',
                    p ->> 'lokacija',
                    p ->> 'aktivna'
                )
            );

        ELSIF r.tabela = 'prekrsok' THEN

            PERFORM dblink_exec(
                conn,
                format(
                    'INSERT INTO prekrsok
                     (prekrsok_id, datum, opis, kamera_id, registarska_oznaka, embg_storitel)
                     VALUES (%L, %L, %L, %L, %L, %L)
                     ON CONFLICT (prekrsok_id) DO NOTHING',
                    p ->> 'prekrsok_id',
                    p ->> 'datum',
                    p ->> 'opis',
                    p ->> 'kamera_id',
                    p ->> 'registarska_oznaka',
                    p ->> 'embg_storitel'
                )
            );

        ELSIF r.tabela = 'kazna' THEN

            PERFORM dblink_exec(
                conn,
                format(
                    'INSERT INTO kazna
                     (kazna_id, prekrsok_id, datum, iznos_za_plakanje, rok_na_plakanje, status)
                     VALUES (%L, %L, %L, %L, %L, %L)
                     ON CONFLICT (kazna_id) DO NOTHING',
                    p ->> 'kazna_id',
                    p ->> 'prekrsok_id',
                    p ->> 'datum',
                    p ->> 'iznos_za_plakanje',
                    p ->> 'rok_na_plakanje',
                    p ->> 'status'
                )
            );

        END IF;

        UPDATE propagation_log
        SET status = 'DONE',
            datum_obrabotka = CURRENT_TIMESTAMP,
            poraka_greska = NULL
        WHERE log_id = r.log_id;

    EXCEPTION WHEN OTHERS THEN
        UPDATE propagation_log
        SET status = 'FAILED',
            datum_obrabotka = CURRENT_TIMESTAMP,
            poraka_greska = SQLERRM
        WHERE log_id = r.log_id;
    END;
END LOOP;
```

END;
$$;

Бидејќи централната и локалната база се две одделни PostgreSQL бази, за комуникација помеѓу нив е користен PostgreSQL extension:

dblink

Овој extension овозможува од една база да се извршуваат SQL команди кон друга база.

CREATE EXTENSION IF NOT EXISTS dblink;

Во процедурата sp_propagiraj_vo_lokalna_baza() преку dblink_exec се извршуваат INSERT команди кон локалната база.

Тек на пропагацијата

Целиот процес се одвива во следните чекори:

  1. Се внесува нов запис во некоја од табелите во централната база.
  2. Trigger автоматски се активира по INSERT операцијата.
  3. Trigger-от ја повикува функцијата fn_log_propagation_insert().
  4. Во propagation_log се додава запис со статус PENDING.
  5. Се повикува процедурата sp_propagiraj_vo_lokalna_baza().
  6. Процедурата ги префрла податоците во локалната база.
  7. Статусот во propagation_log се менува во DONE.

Пример за проверка

За проверка на записите во лог табелата се користи следното барање:

SELECT
log_id,
tabela,
operacija,
zapis_id,
status,
datum_kreiranje,
datum_obrabotka,
poraka_greska
FROM propagation_log
ORDER BY log_id;

По успешна пропагација, записите во колоната status треба да имаат вредност DONE.

За проверка дека податоците се префрлени во локалната база се прави SELECT од соодветните табели:

SELECT * FROM gragjanin;
SELECT * FROM vozilo;
SELECT * FROM kamera;
SELECT * FROM prekrsok;
SELECT * FROM kazna;

Заклучок

Со оваа напредна тема е имплементиран механизам за пропагација на податоци помеѓу централна и локална база. Решението користи trigger-и, log табела, stored procedure и dblink extension.

Овој пристап е соодветен за Smart City Security систем бидејќи овозможува централно внесување на податоци и нивна контролирана синхронизација кон локални бази, што е применливо во системи каде различни институции или зони треба да имаат пристап до релевантни податоци.

Note: See TracWiki for help on using the wiki.