| Version 3 (modified by , 22 hours ago) ( diff ) |
|---|
Recommendation System for worker feed with vector database
For our recommendation system, we used a vector database so we can recommend task requests on the worker's feed so that they are closest to the tasks that the worker usually takes on.
For our recommendation system we will try to suggest task requests that are closest to the user's past completed tasks with a fallback option, that is, if the worker hasn't completed any tasks yet we will try to match the task requests closest to his experience description for the particular category. For this purpose we added a column in the worker_category table called experience_description. This is supposed to be the description that the worker has provided when signing up for the given category.
We will look at how both of the strategies are implemented.
1. Personalized Recommendations
This is the recommendation system that recommends open task requests to workers that have the most similar completed tasks. For this purpose we first embedded the open task requests and then as a starting point for our system we embedded three task requests that resulted in a completed task for each worker. We found this number of tasks enough as a starting point for our app. We built these embeddings with the script embed_taskrequests.py and we used a table to store these vectors:
CREATE TABLE task_request_embeddings (
task_request_id INT PRIMARY KEY
REFERENCES TaskRequest(id)
ON DELETE CASCADE
ON UPDATE CASCADE,
embedding vector(384) NOT NULL,
embedded_at TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
);
Once we embedded all the task requests we needed — the completed ones so we can build the worker profiles and the open ones so we can recommend them — we continued with making the worker recommendation profiles. We did this with a Python script embed_worker_profiles.py and we stored them in the table:
CREATE TABLE worker_recommendation_profiles (
worker_id INT PRIMARY KEY
REFERENCES Worker(id)
ON DELETE CASCADE,
preference_embedding vector(384),
updated_at TIMESTAMP NOT NULL
DEFAULT CURRENT_TIMESTAMP
);
With all this done, we are ready to compare the worker recommendation profile embeddings with the open task request ones and build the worker feed. We did this with the function recommend_task_requests_for_worker:
CREATE OR REPLACE FUNCTION
recommend_task_requests_for_worker
(
p_worker_id BIGINT,
p_limit INT DEFAULT 50
)
RETURNS TABLE
(
task_request_id BIGINT,
distance DOUBLE PRECISION,
recommendation_type TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
IF EXISTS
(
SELECT 1
FROM worker_recommendation_profiles wrp
WHERE wrp.worker_id = p_worker_id
AND wrp.preference_embedding IS NOT NULL
)
THEN
RETURN QUERY
SELECT
tr.id::BIGINT,
tre.embedding
<=>
wrp.preference_embedding
AS distance,
'PERSONALIZED'::TEXT
FROM TaskRequest tr
JOIN task_request_embeddings tre
ON tre.task_request_id = tr.id
JOIN worker_recommendation_profiles wrp
ON wrp.worker_id = p_worker_id
WHERE
tre.embedding IS NOT NULL
AND tr.id NOT IN
(
SELECT
o.task_request_id
FROM Offer o
WHERE
o.worker_id = p_worker_id
)
ORDER BY
distance ASC
LIMIT
p_limit;
ELSE
RETURN QUERY
SELECT
tr.id::BIGINT,
tre.embedding
<=>
wce.embedding
AS distance,
'CATEGORY_BASED'::TEXT
FROM TaskRequest tr
JOIN taskrequestvectorv3 tre
ON tre.task_request_id = tr.id
JOIN WorkerCategory wc
ON wc.worker_id = p_worker_id
JOIN workercategoryvectorv3 wce
ON wce.worker_category_id = wc.id
WHERE
tre.embedding IS NOT NULL
AND tr.id NOT IN
(
SELECT
o.task_request_id
FROM Offer o
WHERE
o.worker_id = p_worker_id
)
ORDER BY
distance ASC
LIMIT
p_limit;
END IF;
END;
$$;
For faster search we used the indexes:
CREATE INDEX IF NOT EXISTS idx_task_request_embeddings_hnsw ON task_request_embeddings USING hnsw ( embedding vector_cosine_ops ); CREATE INDEX IF NOT EXISTS idx_worker_category_embeddings_hnsw ON workercategoryvectorv3 USING hnsw ( embedding vector_cosine_ops );
We can now use:
SELECT * FROM recommend_task_requests_for_worker(19, 100);
to get the recommended 100 task requests for the worker with worker_id = 19.
We can see that in the ELSE block in recommend_task_requests_for_worker we have a different comparison than the one we explained. That is the one we use as a fallback, which we will discuss in section 2.
Keeping Worker Profiles Up to Date
Now we want to build the worker recommendation profiles as time passes and as workers complete tasks. As we said we started with 3, but that will change as the worker completes more tasks. For this purpose we will recalculate the worker recommendation profile every time the worker completes a task. For this purpose we define:
CREATE OR REPLACE FUNCTION
recalculate_worker_profile(
p_worker_id BIGINT
)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
INSERT INTO
worker_recommendation_profiles
(
worker_id,
preference_embedding,
updated_at
)
SELECT
o.worker_id,
AVG(tre.embedding),
CURRENT_TIMESTAMP
FROM Task t
JOIN Offer o
ON o.id = t.offer_id
JOIN task_request_embeddings tre
ON tre.task_request_id =
o.task_request_id
WHERE
o.worker_id = p_worker_id
AND t.status = 'COMPLETED'
GROUP BY
o.worker_id
ON CONFLICT (worker_id)
DO UPDATE SET
preference_embedding =
EXCLUDED.preference_embedding,
updated_at =
CURRENT_TIMESTAMP;
END;
$$;
CREATE OR REPLACE FUNCTION
trg_recalculate_worker_profile()
RETURNS trigger
LANGUAGE plpgsql
AS
$$
DECLARE
v_worker_id BIGINT;
BEGIN
IF NEW.status = 'COMPLETED'
AND (TG_OP = 'INSERT' OR OLD.status IS DISTINCT FROM NEW.status)
THEN
SELECT o.worker_id
INTO v_worker_id
FROM Offer o
WHERE o.id = NEW.offer_id;
PERFORM recalculate_worker_profile(v_worker_id);
END IF;
RETURN NEW;
END;
$$;
CREATE TRIGGER after_task_refresh_worker_profile
AFTER INSERT OR UPDATE OF status
ON Task
FOR EACH ROW
EXECUTE FUNCTION trg_recalculate_worker_profile();
With this we keep the worker's profile up to date with the worker's completed tasks.
For this approach to work, every new task request submitted by a client must be embedded before it can participate in the recommendation process.
2. Category Based Recommendation
If the worker has no completed tasks we will use the experience description that the worker provided when signing up for a category.
For this purpose we embed the worker_category experience description and we get one embedding per category per worker. We do this in the script embed_worker_category_description.py. We also embed the task requests with their description with the script embed_task_requests_description.py and do the matching this way. We do this in the ELSE block of the recommend_task_requests_for_worker function.
Similarly, new worker category descriptions must also be embedded before they can be used for category-based recommendations.
For saving the results from the scripts and for faster search we use these tables and indexes:
CREATE TABLE worker_category_description_vector (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
worker_category_id INT NOT NULL UNIQUE,
embedding vector(384) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (worker_category_id)
REFERENCES WorkerCategory(id)
ON DELETE CASCADE
);
CREATE TABLE task_request_description_vector (
id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_request_id INT NOT NULL UNIQUE,
embedding vector(384) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (task_request_id)
REFERENCES TaskRequest(id)
ON DELETE CASCADE
);
CREATE INDEX idx_worker_category_embeddings_hnsw
ON WorkerCategoryVectorv3
USING hnsw (embedding vector_cosine_ops);
CREATE INDEX idx_task_request_desc_embeddings_hnsw
ON TaskRequestVectorv3
USING hnsw (embedding vector_cosine_ops);
Attachments (4)
- embed_taskrequests.py (4.1 KB ) - added by 28 hours ago.
- embed_worker_profiles.py (3.7 KB ) - added by 28 hours ago.
- embed_task_requests_description.py (1.5 KB ) - added by 21 hours ago.
- embed_worker_category_description.py (2.1 KB ) - added by 21 hours ago.
Download all attachments as: .zip
