Version 7 (modified by 2 weeks ago) ( diff ) | ,
---|
Напреден апликативен развој — Anomalyzer
Имплементирани се:
- Изведени колони (year, month_year, fraction_date) преку BEFORE тригер
- Автоматска дневна агрегирана табела
events_daily_agg
по INSERT/UPDATE/DELETE - Бизнис правила со CHECK и тригер (граници на tone; non-root настаните мора да имаат mentions)
- Оптимизација на перформанси со индекси за чести join/filter полиња
Тригери
BEFORE INSERT/UPDATE на events
Нормализација на кодови, проверка на валидност и автоматско пополнување на деривирани полиња (year
, month_year
, fraction_date
).
CREATE OR REPLACE FUNCTION trg_events_before_ins_upd() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN IF NEW.event_code IS NOT NULL THEN NEW.event_code := upper(NEW.event_code); END IF; IF NEW.avg_tone IS NOT NULL AND (NEW.avg_tone < -10 OR NEW.avg_tone > 10) THEN RAISE EXCEPTION 'avg_tone out of range [-10,10]: %', NEW.avg_tone; END IF; IF NEW.sql_date IS NOT NULL THEN NEW.year := EXTRACT(YEAR FROM NEW.sql_date)::int; NEW.month_year := to_char(NEW.sql_date, 'MMYYYY'); NEW.fraction_date := EXTRACT(DOY FROM NEW.sql_date) / 365.0; END IF; IF NEW.is_root_event IS FALSE AND COALESCE(NEW.num_mentions, 0) <= 0 THEN RAISE EXCEPTION 'non-root events must have num_mentions > 0'; END IF; RETURN NEW; END; $$; CREATE TRIGGER events_before_ins_upd BEFORE INSERT OR UPDATE ON events FOR EACH ROW EXECUTE FUNCTION trg_events_before_ins_upd();
AFTER INSERT/UPDATE/DELETE на events
Автоматско пресметување на дневна агрегaција и чување во табелата events_daily_agg
.
CREATE TABLE IF NOT EXISTS events_daily_agg ( sql_date DATE PRIMARY KEY, events_count INTEGER NOT NULL, avg_tone NUMERIC(6,4) ); CREATE OR REPLACE FUNCTION trg_events_after_ins_upd_del() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN WITH s AS ( SELECT d AS sql_date, COUNT(*) AS events_count, AVG(avg_tone) AS avg_tone FROM (SELECT COALESCE(NEW.sql_date, OLD.sql_date) AS d) q JOIN events e ON e.sql_date = q.d GROUP BY d ) INSERT INTO events_daily_agg(sql_date, events_count, avg_tone) SELECT sql_date, events_count, avg_tone FROM s ON CONFLICT (sql_date) DO UPDATE SET events_count = EXCLUDED.events_count, avg_tone = EXCLUDED.avg_tone; IF NOT EXISTS ( SELECT 1 FROM events WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date) ) THEN DELETE FROM events_daily_agg WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date); END IF; RETURN NULL; END; $$; CREATE TRIGGER events_after_ins AFTER INSERT ON events FOR EACH ROW EXECUTE FUNCTION trg_events_after_ins_upd_del(); CREATE TRIGGER events_after_upd AFTER UPDATE ON events FOR EACH ROW EXECUTE FUNCTION trg_events_after_ins_upd_del(); CREATE TRIGGER events_after_del AFTER DELETE ON events FOR EACH ROW EXECUTE FUNCTION trg_events_after_ins_upd_del();
Трансакции
CRUD операциите врз events
се извршуваат во рамки на трансакициски блокови.
Во Django, ова се постигнува со @transaction.atomic
, што овозможува автоматски ROLLBACK при грешка (нпр. кога тригер ќе подигне исклучок).
Пример:
# src/api/views/events.py from django.db import transaction from rest_framework.response import Response from rest_framework.views import APIView from .services.events import event_create class EventListCreate(APIView): @transaction.atomic def post(self, request): payload = request.data out = event_create(payload) # повик на p_event_create return Response(out, status=201)
Индекси
За перформанси при пребарување и join операции, креирани се индекси:
CREATE INDEX ix_event_details_event ON event_details (global_event_id); CREATE INDEX ix_event_details_actor ON event_details (actor_id); CREATE INDEX ix_event_details_loc ON event_details (location_id); CREATE INDEX ix_notifications_user ON notifications (user_id, notification_date); CREATE INDEX ix_notifications_event ON notifications (event_id); CREATE INDEX ix_predictions_event ON predictions (event_id); CREATE INDEX ix_predictions_actor ON predictions (actor_id); CREATE INDEX ix_analytics_event_actor ON event_analytics (event_id, actor_id, date); CREATE INDEX ix_conflict_pair_date ON conflict_risk (actor1_id, actor2_id, predicted_date);
Складирани процедури
Имплементирани се CRUD процедури за events
, со JSON резултат погоден за директен REST одговор.
p_event_create_bulk
CREATE OR REPLACE PROCEDURE p_event_bulk_create( IN _items jsonb, INOUT cur refcursor ) LANGUAGE plpgsql AS $$ BEGIN -- Validate top-level shape IF jsonb_typeof(_items) <> 'array' THEN RAISE EXCEPTION 'payload must be a JSON array'; END IF; OPEN cur FOR WITH ins AS ( INSERT INTO events ( sql_date, is_root_event, event_code, goldstein_scale, num_mentions, num_sources, num_articles, avg_tone, month_year, year, fraction_date ) SELECT (x->>'sql_date')::date, COALESCE((x->>'is_root_event')::boolean, TRUE), (x->>'event_code')::text, NULLIF(x->>'goldstein_scale','')::numeric, NULLIF(x->>'num_mentions','')::int, NULLIF(x->>'num_sources','')::int, NULLIF(x->>'num_articles','')::int, NULLIF(x->>'avg_tone','')::numeric, to_char((x->>'sql_date')::date, 'MMYYYY'), EXTRACT(YEAR FROM (x->>'sql_date')::date)::int, EXTRACT(DOY FROM (x->>'sql_date')::date) / 365.0 FROM jsonb_array_elements(_items) AS x RETURNING global_event_id, sql_date, event_code ) SELECT jsonb_build_object( 'ok', true, 'inserted_count', (SELECT count(*) FROM ins), 'inserted_ids', (SELECT jsonb_agg(global_event_id) FROM ins) ) AS j; EXCEPTION WHEN OTHERS THEN -- Any error will abort the outer transaction unless handled RAISE; END; $$;
p_event_create_update_delete_bulk
-- Multiple operations in one go: INSERT + UPDATE + DELETE -- Payload shape: -- { -- "insert": [ -- { -- "sql_date": "2025-09-01", -- "is_root_event": true, -- "event_code": "E123", -- "goldstein_scale": 1.5, -- "num_mentions": 5, -- "num_sources": 2, -- "num_articles": 3, -- "avg_tone": 0.75 -- }, ... -- ], -- "update": [ -- { -- "global_event_id": 42, -- "sql_date": "2025-09-02", -- "num_mentions": 7, -- "avg_tone": 0.1 -- }, ... -- ], -- "delete": [ 10, 11, 12 ] -- } CREATE OR REPLACE PROCEDURE p_event_bulk_apply( IN _payload jsonb, INOUT cur refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ DECLARE v_has_insert boolean := (_payload ? 'insert'); v_has_update boolean := (_payload ? 'update'); v_has_delete boolean := (_payload ? 'delete'); BEGIN -- Basic validation IF _payload IS NULL OR jsonb_typeof(_payload) <> 'object' THEN RAISE EXCEPTION 'payload must be a JSON object with optional keys: insert, update, delete'; END IF; IF v_has_insert AND jsonb_typeof(_payload->'insert') <> 'array' THEN RAISE EXCEPTION '"insert" must be an array'; END IF; IF v_has_update AND jsonb_typeof(_payload->'update') <> 'array' THEN RAISE EXCEPTION '"update" must be an array'; END IF; IF v_has_delete AND jsonb_typeof(_payload->'delete') <> 'array' THEN RAISE EXCEPTION '"delete" must be an array of IDs'; END IF; -- Perform all operations in a single statement block using CTEs IF cur IS NULL THEN cur := 'cur_event_bulk_apply'; END IF; OPEN cur FOR WITH -- 1) INSERT many (optional) ins AS ( SELECT * FROM ( INSERT INTO events ( sql_date, is_root_event, event_code, goldstein_scale, num_mentions, num_sources, num_articles, avg_tone, month_year, year, fraction_date ) SELECT (x->>'sql_date')::date, COALESCE((x->>'is_root_event')::boolean, TRUE), UPPER((x->>'event_code')::text), NULLIF(x->>'goldstein_scale','')::numeric, NULLIF(x->>'num_mentions','')::int, NULLIF(x->>'num_sources','')::int, NULLIF(x->>'num_articles','')::int, NULLIF(x->>'avg_tone','')::numeric, TO_CHAR((x->>'sql_date')::date, 'MMYYYY'), EXTRACT(YEAR FROM (x->>'sql_date')::date)::int, EXTRACT(DOY FROM (x->>'sql_date')::date)::numeric / 365.0 FROM jsonb_array_elements(COALESCE(_payload->'insert', '[]'::jsonb)) AS x RETURNING global_event_id ) t ), upd_src AS ( SELECT * FROM jsonb_to_recordset(COALESCE(_payload->'update','[]'::jsonb)) AS r( global_event_id int, sql_date date, is_root_event boolean, event_code text, goldstein_scale numeric(5,2), num_mentions int, num_sources int, num_articles int, avg_tone numeric(5,2) ) ), upd AS ( UPDATE events e SET sql_date = COALESCE(u.sql_date, e.sql_date), is_root_event = COALESCE(u.is_root_event, e.is_root_event), event_code = COALESCE(UPPER(u.event_code), e.event_code), goldstein_scale= COALESCE(u.goldstein_scale, e.goldstein_scale), num_mentions = COALESCE(u.num_mentions, e.num_mentions), num_sources = COALESCE(u.num_sources, e.num_sources), num_articles = COALESCE(u.num_articles, e.num_articles), avg_tone = COALESCE(u.avg_tone, e.avg_tone), month_year = CASE WHEN u.sql_date IS NULL THEN e.month_year ELSE TO_CHAR(u.sql_date,'MMYYYY') END, year = CASE WHEN u.sql_date IS NULL THEN e.year ELSE EXTRACT(YEAR FROM u.sql_date)::int END, fraction_date = CASE WHEN u.sql_date IS NULL THEN e.fraction_date ELSE (EXTRACT(DOY FROM u.sql_date)::numeric / 365.0) END FROM upd_src u WHERE e.global_event_id = u.global_event_id RETURNING e.global_event_id ), -- 3) DELETE many (optional) del_ids AS ( SELECT (x)::int AS id FROM jsonb_array_elements_text(COALESCE(_payload->'delete','[]'::jsonb)) AS x ), del AS ( DELETE FROM events e USING del_ids d WHERE e.global_event_id = d.id RETURNING e.global_event_id ) SELECT jsonb_build_object( 'ok', true, 'inserted_count', (SELECT COUNT(*) FROM ins), 'inserted_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM ins), '[]'::jsonb), 'updated_count', (SELECT COUNT(*) FROM upd), 'updated_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM upd), '[]'::jsonb), 'deleted_count', (SELECT COUNT(*) FROM del), 'deleted_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM del), '[]'::jsonb) ) AS j; EXCEPTION WHEN OTHERS THEN RAISE; END; $$;
p_event_risk_overview
CREATE OR REPLACE PROCEDURE p_event_risk_overview( IN p_event_id int, IN p_actor_id int, IN p_user_id int, IN p_from_date date DEFAULT NULL, IN p_to_date date DEFAULT NULL, INOUT p_cur refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ BEGIN IF p_cur IS NULL THEN p_cur := 'cur_event_risk_overview'; END IF; OPEN p_cur FOR WITH -- Кој актер и која локација се поврзани со настанот event_context AS ( SELECT ed.global_event_id, ed.actor_id, ed.location_id FROM event_details ed WHERE ed.global_event_id = p_event_id OR ed.actor_id = p_actor_id ), -- Нотификации поврзани со корисник или настан notif AS ( SELECT n.notification_id, n.user_id, n.notification_date, n.event_id FROM notifications n WHERE (n.user_id = p_user_id AND n.notification_date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date)) OR (n.event_id = p_event_id) ), -- Прогнози поврзани со настанот/актерот preds AS ( SELECT pr.prediction_id, pr.event_id, pr.actor_id, pr.predicted_date, pr.confidence FROM predictions pr WHERE pr.event_id = p_event_id OR pr.actor_id = p_actor_id ), -- Аналитика за настан + актер по датуми analytics AS ( SELECT ea.event_id, ea.actor_id, ea.date, ea.metric_value FROM event_analytics ea WHERE ea.event_id = p_event_id AND ea.actor_id = p_actor_id AND ea.date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date) ), -- Ризици помеѓу парови актери risks AS ( SELECT cr.actor1_id, cr.actor2_id, cr.predicted_date, cr.risk_score FROM conflict_risk cr WHERE (cr.actor1_id = p_actor_id OR cr.actor2_id = p_actor_id) AND cr.predicted_date BETWEEN COALESCE(p_from_date,'1900-01-01') AND COALESCE(p_to_date, now()::date) ) SELECT jsonb_build_object( 'event_id', p_event_id, 'actor_id', p_actor_id, 'notifications', COALESCE(jsonb_agg(DISTINCT notif), '[]'::jsonb), 'predictions', COALESCE(jsonb_agg(DISTINCT preds), '[]'::jsonb), 'analytics', COALESCE(jsonb_agg(DISTINCT analytics), '[]'::jsonb), 'risks', COALESCE(jsonb_agg(DISTINCT risks), '[]'::jsonb) ) AS j FROM notif, preds, analytics, risks; END; $$;
Објаснување на Execution Plan
При повик на p_event_risk_overview
со EXPLAIN (ANALYZE, BUFFERS)
, PostgreSQL користи индекс скенирања за оптимален пристап:
- event_details →
Index Scan
наix_event_details_event
илиix_event_details_actor
за брзо наоѓање актери/локации поврзани со настанот. - notifications →
Index Scan
наix_notifications_user
(композитен индекс по user/date) илиix_notifications_event
за пребарување на кориснички нотификации. - predictions →
Index Scan
наix_predictions_event
илиix_predictions_actor
за селекција на предвидувања. - event_analytics →
Index Scan
наix_analytics_event_actor
со дополнително филтрирање поdate
. - conflict_risk →
Index Scan
наix_conflict_pair_date
за проверка на ризици меѓу актери во даден временски интервал.
Ова покажува дека секоја табела се пристапува преку соодветен индекс наместо со целосно скенирање, што значително ја зголемува брзината и ја намалува потрошувачката на ресурси при сложени аналитички пребарувања.
p_event_create
CREATE OR REPLACE PROCEDURE p_event_create( IN p_sql_date date, IN p_is_root_event boolean, IN p_event_code varchar(10), IN p_goldstein_scale numeric(5,2) DEFAULT NULL, IN p_num_mentions int DEFAULT NULL, IN p_num_sources int DEFAULT NULL, IN p_num_articles int DEFAULT NULL, IN p_avg_tone numeric(5,2) DEFAULT NULL, INOUT p_result refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ DECLARE v_id int; BEGIN INSERT INTO events ( sql_date, month_year, year, fraction_date, is_root_event, event_code, goldstein_scale, num_mentions, num_sources, num_articles, avg_tone ) VALUES ( p_sql_date, TO_CHAR(p_sql_date,'MMYYYY'), EXTRACT(YEAR FROM p_sql_date)::int, (EXTRACT(DOY FROM p_sql_date)::numeric / 365), COALESCE(p_is_root_event, TRUE), p_event_code, p_goldstein_scale, p_num_mentions, p_num_sources, p_num_articles, p_avg_tone ) RETURNING global_event_id INTO v_id; IF p_result IS NULL THEN p_result := 'cur_event_create'; END IF; OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = v_id; END; $$;
p_event_get
CREATE OR REPLACE PROCEDURE p_event_get( IN p_event_id int, INOUT p_result refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ BEGIN IF p_result IS NULL THEN p_result := 'cur_event_get'; END IF; OPEN p_result FOR SELECT COALESCE( (SELECT to_jsonb(e) FROM events e WHERE e.global_event_id = p_event_id), jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) ) AS j; END; $$;
p_event_list
CREATE OR REPLACE PROCEDURE p_event_list( IN p_limit int DEFAULT 50, IN p_offset int DEFAULT 0, INOUT p_result refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ BEGIN IF p_result IS NULL THEN p_result := 'cur_event_list'; END IF; OPEN p_result FOR WITH rows AS ( SELECT to_jsonb(e) AS j FROM events e ORDER BY e.global_event_id DESC LIMIT GREATEST(p_limit,0) OFFSET GREATEST(p_offset,0) ) SELECT jsonb_build_object( 'ok', true, 'limit', p_limit, 'offset', p_offset, 'results', COALESCE(jsonb_agg(j), '[]'::jsonb) ) AS j FROM rows; END; $$;
p_event_update
CREATE OR REPLACE PROCEDURE p_event_update( IN p_event_id int, IN p_sql_date date DEFAULT NULL, IN p_is_root_event boolean DEFAULT NULL, IN p_event_code varchar(10) DEFAULT NULL, IN p_goldstein_scale numeric(5,2) DEFAULT NULL, IN p_num_mentions int DEFAULT NULL, IN p_num_sources int DEFAULT NULL, IN p_num_articles int DEFAULT NULL, IN p_avg_tone numeric(5,2) DEFAULT NULL, INOUT p_result refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ DECLARE v_exists boolean; BEGIN SELECT EXISTS(SELECT 1 FROM events WHERE global_event_id = p_event_id) INTO v_exists; IF NOT v_exists THEN IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF; OPEN p_result FOR SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j; RETURN; END IF; UPDATE events SET sql_date = COALESCE(p_sql_date, sql_date), is_root_event = COALESCE(p_is_root_event, is_root_event), event_code = COALESCE(p_event_code, event_code), goldstein_scale= COALESCE(p_goldstein_scale, goldstein_scale), num_mentions = COALESCE(p_num_mentions, num_mentions), num_sources = COALESCE(p_num_sources, num_sources), num_articles = COALESCE(p_num_articles, num_articles), avg_tone = COALESCE(p_avg_tone, avg_tone), month_year = CASE WHEN p_sql_date IS NULL THEN month_year ELSE TO_CHAR(p_sql_date,'MMYYYY') END, year = CASE WHEN p_sql_date IS NULL THEN year ELSE EXTRACT(YEAR FROM p_sql_date)::int END, fraction_date = CASE WHEN p_sql_date IS NULL THEN fraction_date ELSE (EXTRACT(DOY FROM p_sql_date)::numeric / 365) END WHERE global_event_id = p_event_id; IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF; OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = p_event_id; END; $$;
p_event_delete
CREATE OR REPLACE PROCEDURE p_event_delete( IN p_event_id int, INOUT p_result refcursor DEFAULT NULL ) LANGUAGE plpgsql AS $$ DECLARE v_ct int; BEGIN DELETE FROM events WHERE global_event_id = p_event_id; GET DIAGNOSTICS v_ct = ROW_COUNT; IF p_result IS NULL THEN p_result := 'cur_event_delete'; END IF; IF v_ct = 0 THEN OPEN p_result FOR SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j; ELSE OPEN p_result FOR SELECT jsonb_build_object('ok', true, 'deleted_id', p_event_id) AS j; END IF; END; $$;
Заклучок
Со оваа комбинација на тригери, трансакициски операции, индекси и складирани процедури, Django апликацијата обезбедува:
- Автоматско ажурирање на агрегирани табели
- Валидација на бизнис правила на податочно ниво
- Оптимизирано пребарување со индекси
- Стандарден JSON излез преку PostgreSQL процедури