| Version 5 (modified by , 4 weeks ago) ( diff ) |
|---|
Напреден апликативен развој — Anomalyzer
Имплементирани се:
- Изведени колони (year, month_year, fraction_date) преку BEFORE тригер
- Автоматска дневна агрегирана табела
events_daily_aggпо INSERT/UPDATE/DELETE - Бизнис правила со CHECK и тригер (граници на tone; non-root настаните мора да имаат mentions)
- Оптимизација на перформанси со индекси за чести join/filter полиња
Тригери
BEFORE INSERT/UPDATE на events
Нормализација на кодови, проверка на валидност и автоматско пополнување на деривирани полиња (year, month_year, fraction_date).
CREATE OR REPLACE FUNCTION trg_events_before_ins_upd()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
IF NEW.event_code IS NOT NULL THEN
NEW.event_code := upper(NEW.event_code);
END IF;
IF NEW.avg_tone IS NOT NULL AND (NEW.avg_tone < -10 OR NEW.avg_tone > 10) THEN
RAISE EXCEPTION 'avg_tone out of range [-10,10]: %', NEW.avg_tone;
END IF;
IF NEW.sql_date IS NOT NULL THEN
NEW.year := EXTRACT(YEAR FROM NEW.sql_date)::int;
NEW.month_year := to_char(NEW.sql_date, 'MMYYYY');
NEW.fraction_date := EXTRACT(DOY FROM NEW.sql_date) / 365.0;
END IF;
IF NEW.is_root_event IS FALSE AND COALESCE(NEW.num_mentions, 0) <= 0 THEN
RAISE EXCEPTION 'non-root events must have num_mentions > 0';
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER events_before_ins_upd
BEFORE INSERT OR UPDATE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_before_ins_upd();
AFTER INSERT/UPDATE/DELETE на events
Автоматско пресметување на дневна агрегaција и чување во табелата events_daily_agg.
CREATE TABLE IF NOT EXISTS events_daily_agg (
sql_date DATE PRIMARY KEY,
events_count INTEGER NOT NULL,
avg_tone NUMERIC(6,4)
);
CREATE OR REPLACE FUNCTION trg_events_after_ins_upd_del()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
WITH s AS (
SELECT d AS sql_date, COUNT(*) AS events_count, AVG(avg_tone) AS avg_tone
FROM (SELECT COALESCE(NEW.sql_date, OLD.sql_date) AS d) q
JOIN events e ON e.sql_date = q.d
GROUP BY d
)
INSERT INTO events_daily_agg(sql_date, events_count, avg_tone)
SELECT sql_date, events_count, avg_tone FROM s
ON CONFLICT (sql_date) DO UPDATE
SET events_count = EXCLUDED.events_count,
avg_tone = EXCLUDED.avg_tone;
IF NOT EXISTS (
SELECT 1 FROM events WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date)
) THEN
DELETE FROM events_daily_agg WHERE sql_date = COALESCE(NEW.sql_date, OLD.sql_date);
END IF;
RETURN NULL;
END;
$$;
CREATE TRIGGER events_after_ins
AFTER INSERT ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();
CREATE TRIGGER events_after_upd
AFTER UPDATE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();
CREATE TRIGGER events_after_del
AFTER DELETE ON events
FOR EACH ROW
EXECUTE FUNCTION trg_events_after_ins_upd_del();
Трансакции
CRUD операциите врз events се извршуваат во рамки на трансакициски блокови.
Во Django, ова се постигнува со @transaction.atomic, што овозможува автоматски ROLLBACK при грешка (нпр. кога тригер ќе подигне исклучок).
Пример:
# src/api/views/events.py
from django.db import transaction
from rest_framework.response import Response
from rest_framework.views import APIView
from .services.events import event_create
class EventListCreate(APIView):
@transaction.atomic
def post(self, request):
payload = request.data
out = event_create(payload) # повик на p_event_create
return Response(out, status=201)
p_event_create_bulk
CREATE OR REPLACE PROCEDURE p_event_bulk_create(
IN _items jsonb,
INOUT cur refcursor
)
LANGUAGE plpgsql
AS $$
BEGIN
-- Validate top-level shape
IF jsonb_typeof(_items) <> 'array' THEN
RAISE EXCEPTION 'payload must be a JSON array';
END IF;
OPEN cur FOR
WITH ins AS (
INSERT INTO events (
sql_date, is_root_event, event_code, goldstein_scale,
num_mentions, num_sources, num_articles, avg_tone,
month_year, year, fraction_date
)
SELECT
(x->>'sql_date')::date,
COALESCE((x->>'is_root_event')::boolean, TRUE),
(x->>'event_code')::text,
NULLIF(x->>'goldstein_scale','')::numeric,
NULLIF(x->>'num_mentions','')::int,
NULLIF(x->>'num_sources','')::int,
NULLIF(x->>'num_articles','')::int,
NULLIF(x->>'avg_tone','')::numeric,
to_char((x->>'sql_date')::date, 'MMYYYY'),
EXTRACT(YEAR FROM (x->>'sql_date')::date)::int,
EXTRACT(DOY FROM (x->>'sql_date')::date) / 365.0
FROM jsonb_array_elements(_items) AS x
RETURNING global_event_id, sql_date, event_code
)
SELECT jsonb_build_object(
'ok', true,
'inserted_count', (SELECT count(*) FROM ins),
'inserted_ids', (SELECT jsonb_agg(global_event_id) FROM ins)
) AS j;
EXCEPTION
WHEN OTHERS THEN
-- Any error will abort the outer transaction unless handled
RAISE;
END;
$$;
p_event_create_update_delete_bulk
-- Multiple operations in one go: INSERT + UPDATE + DELETE
-- Payload shape:
-- {
-- "insert": [
-- {
-- "sql_date": "2025-09-01",
-- "is_root_event": true,
-- "event_code": "E123",
-- "goldstein_scale": 1.5,
-- "num_mentions": 5,
-- "num_sources": 2,
-- "num_articles": 3,
-- "avg_tone": 0.75
-- }, ...
-- ],
-- "update": [
-- {
-- "global_event_id": 42,
-- "sql_date": "2025-09-02",
-- "num_mentions": 7,
-- "avg_tone": 0.1
-- }, ...
-- ],
-- "delete": [ 10, 11, 12 ]
-- }
CREATE OR REPLACE PROCEDURE p_event_bulk_apply(
IN _payload jsonb,
INOUT cur refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE
v_has_insert boolean := (_payload ? 'insert');
v_has_update boolean := (_payload ? 'update');
v_has_delete boolean := (_payload ? 'delete');
BEGIN
-- Basic validation
IF _payload IS NULL OR jsonb_typeof(_payload) <> 'object' THEN
RAISE EXCEPTION 'payload must be a JSON object with optional keys: insert, update, delete';
END IF;
IF v_has_insert AND jsonb_typeof(_payload->'insert') <> 'array' THEN
RAISE EXCEPTION '"insert" must be an array';
END IF;
IF v_has_update AND jsonb_typeof(_payload->'update') <> 'array' THEN
RAISE EXCEPTION '"update" must be an array';
END IF;
IF v_has_delete AND jsonb_typeof(_payload->'delete') <> 'array' THEN
RAISE EXCEPTION '"delete" must be an array of IDs';
END IF;
-- Perform all operations in a single statement block using CTEs
IF cur IS NULL THEN cur := 'cur_event_bulk_apply'; END IF;
OPEN cur FOR
WITH
-- 1) INSERT many (optional)
ins AS (
SELECT * FROM (
INSERT INTO events (
sql_date, is_root_event, event_code, goldstein_scale,
num_mentions, num_sources, num_articles, avg_tone,
month_year, year, fraction_date
)
SELECT
(x->>'sql_date')::date,
COALESCE((x->>'is_root_event')::boolean, TRUE),
UPPER((x->>'event_code')::text),
NULLIF(x->>'goldstein_scale','')::numeric,
NULLIF(x->>'num_mentions','')::int,
NULLIF(x->>'num_sources','')::int,
NULLIF(x->>'num_articles','')::int,
NULLIF(x->>'avg_tone','')::numeric,
TO_CHAR((x->>'sql_date')::date, 'MMYYYY'),
EXTRACT(YEAR FROM (x->>'sql_date')::date)::int,
EXTRACT(DOY FROM (x->>'sql_date')::date)::numeric / 365.0
FROM jsonb_array_elements(COALESCE(_payload->'insert', '[]'::jsonb)) AS x
RETURNING global_event_id
) t
),
upd_src AS (
SELECT *
FROM jsonb_to_recordset(COALESCE(_payload->'update','[]'::jsonb)) AS r(
global_event_id int,
sql_date date,
is_root_event boolean,
event_code text,
goldstein_scale numeric(5,2),
num_mentions int,
num_sources int,
num_articles int,
avg_tone numeric(5,2)
)
),
upd AS (
UPDATE events e
SET
sql_date = COALESCE(u.sql_date, e.sql_date),
is_root_event = COALESCE(u.is_root_event, e.is_root_event),
event_code = COALESCE(UPPER(u.event_code), e.event_code),
goldstein_scale= COALESCE(u.goldstein_scale, e.goldstein_scale),
num_mentions = COALESCE(u.num_mentions, e.num_mentions),
num_sources = COALESCE(u.num_sources, e.num_sources),
num_articles = COALESCE(u.num_articles, e.num_articles),
avg_tone = COALESCE(u.avg_tone, e.avg_tone),
month_year = CASE WHEN u.sql_date IS NULL THEN e.month_year ELSE TO_CHAR(u.sql_date,'MMYYYY') END,
year = CASE WHEN u.sql_date IS NULL THEN e.year ELSE EXTRACT(YEAR FROM u.sql_date)::int END,
fraction_date = CASE WHEN u.sql_date IS NULL THEN e.fraction_date ELSE (EXTRACT(DOY FROM u.sql_date)::numeric / 365.0) END
FROM upd_src u
WHERE e.global_event_id = u.global_event_id
RETURNING e.global_event_id
),
-- 3) DELETE many (optional)
del_ids AS (
SELECT (x)::int AS id
FROM jsonb_array_elements_text(COALESCE(_payload->'delete','[]'::jsonb)) AS x
),
del AS (
DELETE FROM events e
USING del_ids d
WHERE e.global_event_id = d.id
RETURNING e.global_event_id
)
SELECT jsonb_build_object(
'ok', true,
'inserted_count', (SELECT COUNT(*) FROM ins),
'inserted_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM ins), '[]'::jsonb),
'updated_count', (SELECT COUNT(*) FROM upd),
'updated_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM upd), '[]'::jsonb),
'deleted_count', (SELECT COUNT(*) FROM del),
'deleted_ids', COALESCE((SELECT jsonb_agg(global_event_id) FROM del), '[]'::jsonb)
) AS j;
EXCEPTION
WHEN OTHERS THEN
RAISE;
END;
$$;
Индекси
За перформанси при пребарување и join операции, креирани се индекси:
CREATE INDEX ix_event_details_event ON event_details (global_event_id); CREATE INDEX ix_event_details_actor ON event_details (actor_id); CREATE INDEX ix_event_details_loc ON event_details (location_id); CREATE INDEX ix_notifications_user ON notifications (user_id, notification_date); CREATE INDEX ix_notifications_event ON notifications (event_id); CREATE INDEX ix_predictions_event ON predictions (event_id); CREATE INDEX ix_predictions_actor ON predictions (actor_id); CREATE INDEX ix_analytics_event_actor ON event_analytics (event_id, actor_id, date); CREATE INDEX ix_conflict_pair_date ON conflict_risk (actor1_id, actor2_id, predicted_date);
Складирани процедури
Имплементирани се CRUD процедури за events, со JSON резултат погоден за директен REST одговор.
p_event_create
CREATE OR REPLACE PROCEDURE p_event_create(
IN p_sql_date date,
IN p_is_root_event boolean,
IN p_event_code varchar(10),
IN p_goldstein_scale numeric(5,2) DEFAULT NULL,
IN p_num_mentions int DEFAULT NULL,
IN p_num_sources int DEFAULT NULL,
IN p_num_articles int DEFAULT NULL,
IN p_avg_tone numeric(5,2) DEFAULT NULL,
INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_id int;
BEGIN
INSERT INTO events (
sql_date, month_year, year, fraction_date, is_root_event, event_code,
goldstein_scale, num_mentions, num_sources, num_articles, avg_tone
)
VALUES (
p_sql_date, TO_CHAR(p_sql_date,'MMYYYY'),
EXTRACT(YEAR FROM p_sql_date)::int,
(EXTRACT(DOY FROM p_sql_date)::numeric / 365),
COALESCE(p_is_root_event, TRUE), p_event_code,
p_goldstein_scale, p_num_mentions, p_num_sources, p_num_articles, p_avg_tone
)
RETURNING global_event_id INTO v_id;
IF p_result IS NULL THEN p_result := 'cur_event_create'; END IF;
OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = v_id;
END;
$$;
p_event_get
CREATE OR REPLACE PROCEDURE p_event_get(
IN p_event_id int,
INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
BEGIN
IF p_result IS NULL THEN p_result := 'cur_event_get'; END IF;
OPEN p_result FOR
SELECT COALESCE(
(SELECT to_jsonb(e) FROM events e WHERE e.global_event_id = p_event_id),
jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id)
) AS j;
END;
$$;
p_event_list
CREATE OR REPLACE PROCEDURE p_event_list(
IN p_limit int DEFAULT 50,
IN p_offset int DEFAULT 0,
INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
BEGIN
IF p_result IS NULL THEN p_result := 'cur_event_list'; END IF;
OPEN p_result FOR
WITH rows AS (
SELECT to_jsonb(e) AS j
FROM events e
ORDER BY e.global_event_id DESC
LIMIT GREATEST(p_limit,0) OFFSET GREATEST(p_offset,0)
)
SELECT jsonb_build_object(
'ok', true,
'limit', p_limit,
'offset', p_offset,
'results', COALESCE(jsonb_agg(j), '[]'::jsonb)
) AS j
FROM rows;
END;
$$;
p_event_update
CREATE OR REPLACE PROCEDURE p_event_update(
IN p_event_id int,
IN p_sql_date date DEFAULT NULL,
IN p_is_root_event boolean DEFAULT NULL,
IN p_event_code varchar(10) DEFAULT NULL,
IN p_goldstein_scale numeric(5,2) DEFAULT NULL,
IN p_num_mentions int DEFAULT NULL,
IN p_num_sources int DEFAULT NULL,
IN p_num_articles int DEFAULT NULL,
IN p_avg_tone numeric(5,2) DEFAULT NULL,
INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_exists boolean;
BEGIN
SELECT EXISTS(SELECT 1 FROM events WHERE global_event_id = p_event_id) INTO v_exists;
IF NOT v_exists THEN
IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF;
OPEN p_result FOR
SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j;
RETURN;
END IF;
UPDATE events SET
sql_date = COALESCE(p_sql_date, sql_date),
is_root_event = COALESCE(p_is_root_event, is_root_event),
event_code = COALESCE(p_event_code, event_code),
goldstein_scale= COALESCE(p_goldstein_scale, goldstein_scale),
num_mentions = COALESCE(p_num_mentions, num_mentions),
num_sources = COALESCE(p_num_sources, num_sources),
num_articles = COALESCE(p_num_articles, num_articles),
avg_tone = COALESCE(p_avg_tone, avg_tone),
month_year = CASE WHEN p_sql_date IS NULL THEN month_year ELSE TO_CHAR(p_sql_date,'MMYYYY') END,
year = CASE WHEN p_sql_date IS NULL THEN year ELSE EXTRACT(YEAR FROM p_sql_date)::int END,
fraction_date = CASE WHEN p_sql_date IS NULL THEN fraction_date ELSE (EXTRACT(DOY FROM p_sql_date)::numeric / 365) END
WHERE global_event_id = p_event_id;
IF p_result IS NULL THEN p_result := 'cur_event_update'; END IF;
OPEN p_result FOR SELECT to_jsonb(e) AS j FROM events e WHERE e.global_event_id = p_event_id;
END;
$$;
p_event_delete
CREATE OR REPLACE PROCEDURE p_event_delete(
IN p_event_id int,
INOUT p_result refcursor DEFAULT NULL
)
LANGUAGE plpgsql
AS $$
DECLARE v_ct int;
BEGIN
DELETE FROM events WHERE global_event_id = p_event_id;
GET DIAGNOSTICS v_ct = ROW_COUNT;
IF p_result IS NULL THEN p_result := 'cur_event_delete'; END IF;
IF v_ct = 0 THEN
OPEN p_result FOR
SELECT jsonb_build_object('ok', false, 'error', 'not_found', 'id', p_event_id) AS j;
ELSE
OPEN p_result FOR
SELECT jsonb_build_object('ok', true, 'deleted_id', p_event_id) AS j;
END IF;
END;
$$;
Заклучок
Со оваа комбинација на тригери, трансакициски операции, индекси и складирани процедури, Django апликацијата обезбедува:
- Автоматско ажурирање на агрегирани табели
- Валидација на бизнис правила на податочно ниво
- Оптимизирано пребарување со индекси
- Стандарден JSON излез преку PostgreSQL процедури
