wiki:appdevelopment

Version 6 (modified by 155036, 2 weeks ago) ( diff )

--

Напреден апликативен развој — Anomalyzer

Имплементирани се:

  • Изведени колони (year, month_year, fraction_date) преку BEFORE тригер
  • Автоматска дневна агрегирана табела events_daily_agg по INSERT/UPDATE/DELETE
  • Бизнис правила со CHECK и тригер (граници на tone; non-root настаните мора да имаат mentions)
  • Оптимизација на перформанси со индекси за чести join/filter полиња

Тригери

BEFORE INSERT/UPDATE на events

Нормализација на кодови, проверка на валидност и автоматско пополнување на деривирани полиња (year, month_year, fraction_date).

CREATE OR REPLACE FUNCTION trg_events_before_ins_upd()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
  IF NEW.event_code IS NOT NULL THEN
    NEW.event_code := upper(NEW.event_code);
  END IF;

  IF NEW.avg_tone IS NOT NULL AND (NEW.avg_tone < -10 OR NEW.avg_tone > 10) THEN
    RAISE EXCEPTION 'avg_tone out of range [-10,10]: %', NEW.avg_tone;
  END IF;

  IF NEW.sql_date IS NOT NULL THEN
    NEW.year := EXTRACT(YEAR FROM NEW.sql_date)::int;
    NEW.month_year := to_char(NEW.sql_date, 'MMYYYY');
    NEW.fraction_date := EXTRACT(DOY FROM NEW.sql_date) / 365.0;
  END IF;

  IF NEW.is_root_event IS FALSE AND COALESCE(NEW.num_mentions, 0) <= 0 THEN
    RAISE EXCEPTION 'non-root events must have num_mentions > 0';
  END IF;

  RETURN NEW;
END;
$$;

CREATE TRIGGER events_before_ins_upd
BEFORE INSERT OR UPDATE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_before_ins_upd();

AFTER INSERT/UPDATE/DELETE на events

Автоматско пресметување на дневна агрегaција и чување во табелата events_daily_agg.

CREATE TABLE IF NOT EXISTS events_daily_agg (
  sql_date DATE PRIMARY KEY,
  events_count INTEGER NOT NULL,
  avg_tone NUMERIC(6,4)
);

CREATE OR REPLACE FUNCTION trg_events_after_ins_upd_del()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
  WITH s AS (
    SELECT d AS sql_date, COUNT(*) AS events_count, AVG(avg_tone) AS avg_tone
    FROM (SELECT COALESCE(NEW.sql_date, OLD.sql_date) AS d) q
    JOIN events e ON e.sql_date = q.d
    GROUP BY d
  )
  INSERT INTO events_daily_agg(sql_date, events_count, avg_tone)
  SELECT sql_date, events_count, avg_tone FROM s
  ON CONFLICT (sql_date) DO UPDATE
  SET events_count = EXCLUDED.events_count,
      avg_tone     = EXCLUDED.avg_tone;

  IF NOT EXISTS (
    SELECT 1 FROM events WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date)
  ) THEN
    DELETE FROM events_daily_agg WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date);
  END IF;

  RETURN NULL;
END;
$$;

CREATE TRIGGER events_after_ins
AFTER INSERT ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();

CREATE TRIGGER events_after_upd
AFTER UPDATE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();

CREATE TRIGGER events_after_del
AFTER DELETE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();

Трансакции

CRUD операциите врз events се извршуваат во рамки на трансакициски блокови. Во Django, ова се постигнува со @transaction.atomic, што овозможува автоматски ROLLBACK при грешка (нпр. кога тригер ќе подигне исклучок).

Пример:

# src/api/views/events.py
from django.db import transaction
from rest_framework.response import Response
from rest_framework.views import APIView
from .services.events import event_create

class EventListCreate(APIView):
    @transaction.atomic
    def post(self, request):
        payload = request.data
        out = event_create(payload)  # повик на p_event_create
        return Response(out, status=201)

p_event_create_bulk

CREATE OR REPLACE PROCEDURE p_event_bulk_create(
  IN  _items jsonb,
  INOUT cur refcursor
)
LANGUAGE plpgsql
AS $$
BEGIN
  -- Validate top-level shape
  IF jsonb_typeof(_items) <> 'array' THEN
    RAISE EXCEPTION 'payload must be a JSON array';
  END IF;

  OPEN cur FOR
  WITH ins AS (
      INSERT INTO events (
        sql_date, is_root_event, event_code, goldstein_scale,
        num_mentions, num_sources, num_articles, avg_tone,
        month_year, year, fraction_date
      )
      SELECT
        (x->>'sql_date')::date,
        COALESCE((x->>'is_root_event')::boolean, TRUE),
        (x->>'event_code')::text,
        NULLIF(x->>'goldstein_scale','')::numeric,
        NULLIF(x->>'num_mentions','')::int,
        NULLIF(x->>'num_sources','')::int,
        NULLIF(x->>'num_articles','')::int,
        NULLIF(x->>'avg_tone','')::numeric,
        to_char((x->>'sql_date')::date, 'MMYYYY'),
        EXTRACT(YEAR FROM (x->>'sql_date')::date)::int,
        EXTRACT(DOY FROM (x->>'sql_date')::date) / 365.0
      FROM jsonb_array_elements(_items) AS x
      RETURNING global_event_id, sql_date, event_code
  )
  SELECT jsonb_build_object(
    'ok', true,
    'inserted_count', (SELECT count(*) FROM ins),
    'inserted_ids', (SELECT jsonb_agg(global_event_id) FROM ins)
  ) AS j;

EXCEPTION
  WHEN OTHERS THEN
    -- Any error will abort the outer transaction unless handled
    RAISE;
END;
$$;

p_event_create_update_delete_bulk

-- Multiple operations in one go: INSERT + UPDATE + DELETE
-- Payload shape:
-- {
--   "insert": [
--     {
--       "sql_date": "2025-09-01",
--       "is_root_event": true,
--       "event_code": "E123",
--       "goldstein_scale": 1.5,
--       "num_mentions": 5,
--       "num_sources": 2,
--       "num_articles": 3,
--       "avg_tone": 0.75
--     }, ...
--   ],
--   "update": [
--     {
--       "global_event_id": 42,
--       "sql_date": "2025-09-02",          
--       "num_mentions": 7,
--       "avg_tone": 0.1
--     }, ...
--   ],
--   "delete": [ 10, 11, 12 ]      
-- }

CREATE OR REPLACE PROCEDURE p_event_bulk_apply(
  IN  _payload jsonb,
  INOUT cur refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE
  v_has_insert boolean := (_payload ? 'insert');
  v_has_update boolean := (_payload ? 'update');
  v_has_delete boolean := (_payload ? 'delete');
BEGIN
  -- Basic validation
  IF _payload IS NULL OR jsonb_typeof(_payload) <> 'object' THEN
    RAISE EXCEPTION 'payload must be a JSON object with optional keys: insert, update, delete';
  END IF;
  IF v_has_insert AND jsonb_typeof(_payload->'insert') <> 'array' THEN
    RAISE EXCEPTION '"insert" must be an array';
  END IF;
  IF v_has_update AND jsonb_typeof(_payload->'update') <> 'array' THEN
    RAISE EXCEPTION '"update" must be an array';
  END IF;
  IF v_has_delete AND jsonb_typeof(_payload->'delete') <> 'array' THEN
    RAISE EXCEPTION '"delete" must be an array of IDs';
  END IF;

  -- Perform all operations in a single statement block using CTEs
  IF cur IS NULL THEN cur := 'cur_event_bulk_apply'; END IF;
  OPEN cur FOR
  WITH
  -- 1) INSERT many (optional)
  ins AS (
    SELECT * FROM (
      INSERT INTO events (
        sql_date, is_root_event, event_code, goldstein_scale,
        num_mentions, num_sources, num_articles, avg_tone,
        month_year, year, fraction_date
      )
      SELECT
        (x->>'sql_date')::date,
        COALESCE((x->>'is_root_event')::boolean, TRUE),
        UPPER((x->>'event_code')::text),
        NULLIF(x->>'goldstein_scale','')::numeric,
        NULLIF(x->>'num_mentions','')::int,
        NULLIF(x->>'num_sources','')::int,
        NULLIF(x->>'num_articles','')::int,
        NULLIF(x->>'avg_tone','')::numeric,
        TO_CHAR((x->>'sql_date')::date, 'MMYYYY'),
        EXTRACT(YEAR FROM (x->>'sql_date')::date)::int,
        EXTRACT(DOY FROM (x->>'sql_date')::date)::numeric / 365.0
      FROM jsonb_array_elements(COALESCE(_payload->'insert', '[]'::jsonb)) AS x
      RETURNING global_event_id
    ) t
  ),
  upd_src AS (
    SELECT *
    FROM jsonb_to_recordset(COALESCE(_payload->'update','[]'::jsonb)) AS r(
      global_event_id int,
      sql_date date,
      is_root_event boolean,
      event_code text,
      goldstein_scale numeric(5,2),
      num_mentions int,
      num_sources int,
      num_articles int,
      avg_tone numeric(5,2)
    )
  ),
  upd AS (
    UPDATE events e
    SET
      sql_date       = COALESCE(u.sql_date, e.sql_date),
      is_root_event  = COALESCE(u.is_root_event, e.is_root_event),
      event_code     = COALESCE(UPPER(u.event_code), e.event_code),
      goldstein_scale= COALESCE(u.goldstein_scale, e.goldstein_scale),
      num_mentions   = COALESCE(u.num_mentions, e.num_mentions),
      num_sources    = COALESCE(u.num_sources,  e.num_sources),
      num_articles   = COALESCE(u.num_articles, e.num_articles),
      avg_tone       = COALESCE(u.avg_tone,     e.avg_tone),
      month_year     = CASE WHEN u.sql_date IS NULL THEN e.month_year ELSE TO_CHAR(u.sql_date,'MMYYYY') END,
      year           = CASE WHEN u.sql_date IS NULL THEN e.year ELSE EXTRACT(YEAR FROM u.sql_date)::int END,
      fraction_date  = CASE WHEN u.sql_date IS NULL THEN e.fraction_date ELSE (EXTRACT(DOY FROM u.sql_date)::numeric / 365.0) END
    FROM upd_src u
    WHERE e.global_event_id = u.global_event_id
    RETURNING e.global_event_id
  ),
  -- 3) DELETE many (optional)
  del_ids AS (
    SELECT (x)::int AS id
    FROM jsonb_array_elements_text(COALESCE(_payload->'delete','[]'::jsonb)) AS x
  ),
  del AS (
    DELETE FROM events e
    USING del_ids d
    WHERE e.global_event_id = d.id
    RETURNING e.global_event_id
  )
  SELECT jsonb_build_object(
    'ok', true,
    'inserted_count', (SELECT COUNT(*) FROM ins),
    'inserted_ids',   COALESCE((SELECT jsonb_agg(global_event_id) FROM ins), '[]'::jsonb),
    'updated_count',  (SELECT COUNT(*) FROM upd),
    'updated_ids',    COALESCE((SELECT jsonb_agg(global_event_id) FROM upd), '[]'::jsonb),
    'deleted_count',  (SELECT COUNT(*) FROM del),
    'deleted_ids',    COALESCE((SELECT jsonb_agg(global_event_id) FROM del), '[]'::jsonb)
  ) AS j;


EXCEPTION
  WHEN OTHERS THEN
    RAISE;
END;
$$;

p_event_risk_overview

CREATE OR REPLACE PROCEDURE p_event_risk_overview(
  IN  p_event_id   int,
  IN  p_actor_id   int,
  IN  p_user_id    int,
  IN  p_from_date  date DEFAULT NULL,
  IN  p_to_date    date DEFAULT NULL,
  INOUT p_cur      refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
BEGIN
  IF p_cur IS NULL THEN p_cur := 'cur_event_risk_overview'; END IF;

  OPEN p_cur FOR
  WITH
  -- Кој актер и која локација се поврзани со настанот
  event_context AS (
    SELECT ed.global_event_id, ed.actor_id, ed.location_id
    FROM event_details ed
    WHERE ed.global_event_id = p_event_id
       OR ed.actor_id = p_actor_id
  ),
  -- Нотификации поврзани со корисник или настан
  notif AS (
    SELECT n.notification_id, n.user_id, n.notification_date, n.event_id
    FROM notifications n
    WHERE (n.user_id = p_user_id AND n.notification_date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date))
       OR (n.event_id = p_event_id)
  ),
  -- Прогнози поврзани со настанот/актерот
  preds AS (
    SELECT pr.prediction_id, pr.event_id, pr.actor_id, pr.predicted_date, pr.confidence
    FROM predictions pr
    WHERE pr.event_id = p_event_id
       OR pr.actor_id = p_actor_id
  ),
  -- Аналитика за настан + актер по датуми
  analytics AS (
    SELECT ea.event_id, ea.actor_id, ea.date, ea.metric_value
    FROM event_analytics ea
    WHERE ea.event_id = p_event_id
      AND ea.actor_id = p_actor_id
      AND ea.date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date)
  ),
  -- Ризици помеѓу парови актери
  risks AS (
    SELECT cr.actor1_id, cr.actor2_id, cr.predicted_date, cr.risk_score
    FROM conflict_risk cr
    WHERE (cr.actor1_id = p_actor_id OR cr.actor2_id = p_actor_id)
      AND cr.predicted_date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date)
  )
  SELECT jsonb_build_object(
    'event_id', p_event_id,
    'actor_id', p_actor_id,
    'notifications', COALESCE(jsonb_agg(DISTINCT notif), '[]'::jsonb),
    'predictions',  COALESCE(jsonb_agg(DISTINCT preds), '[]'::jsonb),
    'analytics',    COALESCE(jsonb_agg(DISTINCT analytics), '[]'::jsonb),
    'risks',        COALESCE(jsonb_agg(DISTINCT risks), '[]'::jsonb)
  ) AS j
  FROM notif, preds, analytics, risks;
END;
$$;

Објаснување на Execution Plan

При повик на p_event_risk_overview со EXPLAIN (ANALYZE, BUFFERS), PostgreSQL користи индекс скенирања за оптимален пристап:

  • event_detailsIndex Scan на ix_event_details_event или ix_event_details_actor за брзо наоѓање актери/локации поврзани со настанот.
  • notificationsIndex Scan на ix_notifications_user (композитен индекс по user/date) или ix_notifications_event за пребарување на кориснички нотификации.
  • predictionsIndex Scan на ix_predictions_event или ix_predictions_actor за селекција на предвидувања.
  • event_analyticsIndex Scan на ix_analytics_event_actor со дополнително филтрирање по date.
  • conflict_riskIndex Scan на ix_conflict_pair_date за проверка на ризици меѓу актери во даден временски интервал.

Ова покажува дека секоја табела се пристапува преку соодветен индекс наместо со целосно скенирање, што значително ја зголемува брзината и ја намалува потрошувачката на ресурси при сложени аналитички пребарувања.

Индекси

За перформанси при пребарување и join операции, креирани се индекси:

CREATE INDEX ix_event_details_event ON event_details (global_event_id);
CREATE INDEX ix_event_details_actor ON event_details (actor_id);
CREATE INDEX ix_event_details_loc   ON event_details (location_id);

CREATE INDEX ix_notifications_user  ON notifications (user_id, notification_date);
CREATE INDEX ix_notifications_event ON notifications (event_id);

CREATE INDEX ix_predictions_event   ON predictions (event_id);
CREATE INDEX ix_predictions_actor   ON predictions (actor_id);

CREATE INDEX ix_analytics_event_actor ON event_analytics (event_id, actor_id, date);
CREATE INDEX ix_conflict_pair_date    ON conflict_risk (actor1_id, actor2_id, predicted_date);

Складирани процедури

Имплементирани се CRUD процедури за events, со JSON резултат погоден за директен REST одговор.

p_event_create

CREATE OR REPLACE PROCEDURE p_event_create(
  IN  p_sql_date date,
  IN  p_is_root_event boolean,
  IN  p_event_code varchar(10),
  IN  p_goldstein_scale numeric(5,2) DEFAULT NULL,
  IN  p_num_mentions int DEFAULT NULL,
  IN  p_num_sources  int DEFAULT NULL,
  IN  p_num_articles int DEFAULT NULL,
  IN  p_avg_tone    numeric(5,2) DEFAULT NULL,
  INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_id int;
BEGIN
  INSERT INTO events (
    sql_date, month_year, year, fraction_date, is_root_event, event_code,
    goldstein_scale, num_mentions, num_sources, num_articles, avg_tone
  )
  VALUES (
    p_sql_date, TO_CHAR(p_sql_date,'MMYYYY'),
    EXTRACT(YEAR FROM p_sql_date)::int,
    (EXTRACT(DOY FROM p_sql_date)::numeric / 365),
    COALESCE(p_is_root_event, TRUE), p_event_code,
    p_goldstein_scale, p_num_mentions, p_num_sources, p_num_articles, p_avg_tone
  )
  RETURNING global_event_id INTO v_id;

  IF p_result IS NULL THEN p_result := 'cur_event_create'; END IF;
  OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = v_id;
END;
$$;

p_event_get

CREATE OR REPLACE PROCEDURE p_event_get(
  IN  p_event_id int,
  INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
BEGIN
  IF p_result IS NULL THEN p_result := 'cur_event_get'; END IF;
  OPEN p_result FOR
    SELECT COALESCE(
             (SELECT to_jsonb(e) FROM events e WHERE e.global_event_id = p_event_id),
             jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id)
           ) AS j;
END;
$$;

p_event_list

CREATE OR REPLACE PROCEDURE p_event_list(
  IN  p_limit int DEFAULT 50,
  IN  p_offset int DEFAULT 0,
  INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
BEGIN
  IF p_result IS NULL THEN p_result := 'cur_event_list'; END IF;
  OPEN p_result FOR
    WITH rows AS (
      SELECT to_jsonb(e) AS j
      FROM events e
      ORDER BY e.global_event_id DESC
      LIMIT GREATEST(p_limit,0) OFFSET GREATEST(p_offset,0)
    )
    SELECT jsonb_build_object(
      'ok', true,
      'limit', p_limit,
      'offset', p_offset,
      'results', COALESCE(jsonb_agg(j), '[]'::jsonb)
    ) AS j
    FROM rows;
END;
$$;

p_event_update

CREATE OR REPLACE PROCEDURE p_event_update(
  IN  p_event_id int,
  IN  p_sql_date date DEFAULT NULL,
  IN  p_is_root_event boolean DEFAULT NULL,
  IN  p_event_code varchar(10) DEFAULT NULL,
  IN  p_goldstein_scale numeric(5,2) DEFAULT NULL,
  IN  p_num_mentions int DEFAULT NULL,
  IN  p_num_sources  int DEFAULT NULL,
  IN  p_num_articles int DEFAULT NULL,
  IN  p_avg_tone    numeric(5,2) DEFAULT NULL,
  INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_exists boolean;
BEGIN
  SELECT EXISTS(SELECT 1 FROM events WHERE global_event_id = p_event_id) INTO v_exists;
  IF NOT v_exists THEN
    IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF;
    OPEN p_result FOR
      SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j;
    RETURN;
  END IF;

  UPDATE events SET
    sql_date       = COALESCE(p_sql_date, sql_date),
    is_root_event  = COALESCE(p_is_root_event, is_root_event),
    event_code     = COALESCE(p_event_code, event_code),
    goldstein_scale= COALESCE(p_goldstein_scale, goldstein_scale),
    num_mentions   = COALESCE(p_num_mentions, num_mentions),
    num_sources    = COALESCE(p_num_sources,  num_sources),
    num_articles   = COALESCE(p_num_articles, num_articles),
    avg_tone       = COALESCE(p_avg_tone,     avg_tone),
    month_year     = CASE WHEN p_sql_date IS NULL THEN month_year ELSE TO_CHAR(p_sql_date,'MMYYYY') END,
    year           = CASE WHEN p_sql_date IS NULL THEN year ELSE EXTRACT(YEAR FROM p_sql_date)::int END,
    fraction_date  = CASE WHEN p_sql_date IS NULL THEN fraction_date ELSE (EXTRACT(DOY FROM p_sql_date)::numeric / 365) END
  WHERE global_event_id = p_event_id;

  IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF;
  OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = p_event_id;
END;
$$;

p_event_delete

CREATE OR REPLACE PROCEDURE p_event_delete(
  IN  p_event_id int,
  INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_ct int;
BEGIN
  DELETE FROM events WHERE global_event_id = p_event_id;
  GET DIAGNOSTICS v_ct = ROW_COUNT;

  IF p_result IS NULL THEN p_result := 'cur_event_delete'; END IF;
  IF v_ct = 0 THEN
    OPEN p_result FOR
      SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j;
  ELSE
    OPEN p_result FOR
      SELECT jsonb_build_object('ok', true, 'deleted_id', p_event_id) AS j;
  END IF;
END;
$$;

Заклучок

Со оваа комбинација на тригери, трансакициски операции, индекси и складирани процедури, Django апликацијата обезбедува:

  • Автоматско ажурирање на агрегирани табели
  • Валидација на бизнис правила на податочно ниво
  • Оптимизирано пребарување со индекси
  • Стандарден JSON излез преку PostgreSQL процедури
Note: See TracWiki for help on using the wiki.