wiki:AdvancedTopics

Version 3 (modified by 231141, 22 hours ago) ( diff )

--

Recommendation System for worker feed with vector database

For our recommendation system, we used a vector database so we can recommend task requests on the worker's feed so that they are closest to the tasks that the worker usually takes on.

For our recommendation system we will try to suggest task requests that are closest to the user's past completed tasks with a fallback option, that is, if the worker hasn't completed any tasks yet we will try to match the task requests closest to his experience description for the particular category. For this purpose we added a column in the worker_category table called experience_description. This is supposed to be the description that the worker has provided when signing up for the given category.

We will look at how both of the strategies are implemented.

1. Personalized Recommendations

This is the recommendation system that recommends open task requests to workers that have the most similar completed tasks. For this purpose we first embedded the open task requests and then as a starting point for our system we embedded three task requests that resulted in a completed task for each worker. We found this number of tasks enough as a starting point for our app. We built these embeddings with the script embed_taskrequests.py and we used a table to store these vectors:

CREATE TABLE task_request_embeddings (
    task_request_id INT PRIMARY KEY
        REFERENCES TaskRequest(id)
        ON DELETE CASCADE
        ON UPDATE CASCADE,

    embedding vector(384) NOT NULL,

    embedded_at TIMESTAMP NOT NULL
        DEFAULT CURRENT_TIMESTAMP
);

Once we embedded all the task requests we needed — the completed ones so we can build the worker profiles and the open ones so we can recommend them — we continued with making the worker recommendation profiles. We did this with a Python script embed_worker_profiles.py and we stored them in the table:

CREATE TABLE worker_recommendation_profiles (
    worker_id INT PRIMARY KEY
        REFERENCES Worker(id)
        ON DELETE CASCADE,
    preference_embedding vector(384),
    updated_at TIMESTAMP NOT NULL
        DEFAULT CURRENT_TIMESTAMP
);

With all this done, we are ready to compare the worker recommendation profile embeddings with the open task request ones and build the worker feed. We did this with the function recommend_task_requests_for_worker:

CREATE OR REPLACE FUNCTION
recommend_task_requests_for_worker
(
    p_worker_id BIGINT,
    p_limit INT DEFAULT 50
)
RETURNS TABLE
(
    task_request_id BIGINT,
    distance DOUBLE PRECISION,
    recommendation_type TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
    IF EXISTS
    (
        SELECT 1
        FROM worker_recommendation_profiles wrp
        WHERE wrp.worker_id = p_worker_id
          AND wrp.preference_embedding IS NOT NULL
    )
    THEN
        RETURN QUERY
        SELECT
            tr.id::BIGINT,
            tre.embedding
            <=>
            wrp.preference_embedding
            AS distance,
            'PERSONALIZED'::TEXT
        FROM TaskRequest tr
        JOIN task_request_embeddings tre
            ON tre.task_request_id = tr.id
        JOIN worker_recommendation_profiles wrp
            ON wrp.worker_id = p_worker_id
        WHERE
            tre.embedding IS NOT NULL
            AND tr.id NOT IN
            (
                SELECT
                    o.task_request_id
                FROM Offer o
                WHERE
                    o.worker_id = p_worker_id
            )
         ORDER BY
             distance ASC
        LIMIT
            p_limit;
    ELSE
        RETURN QUERY
        SELECT
            tr.id::BIGINT,
            tre.embedding
            <=>
            wce.embedding
            AS distance,
            'CATEGORY_BASED'::TEXT
        FROM TaskRequest tr
        JOIN taskrequestvectorv3 tre
            ON tre.task_request_id = tr.id
        JOIN WorkerCategory wc
            ON wc.worker_id = p_worker_id
        JOIN workercategoryvectorv3 wce
            ON wce.worker_category_id = wc.id
        WHERE
            tre.embedding IS NOT NULL
            AND tr.id NOT IN
            (
                SELECT
                    o.task_request_id
                FROM Offer o
                WHERE
                    o.worker_id = p_worker_id
            )
        ORDER BY
            distance ASC
        LIMIT
            p_limit;
    END IF;
END;
$$;

For faster search we used the indexes:

CREATE INDEX IF NOT EXISTS
idx_task_request_embeddings_hnsw
ON task_request_embeddings
USING hnsw
( embedding vector_cosine_ops );

CREATE INDEX IF NOT EXISTS
idx_worker_category_embeddings_hnsw
ON workercategoryvectorv3
USING hnsw
( embedding vector_cosine_ops );

We can now use:

SELECT *
FROM recommend_task_requests_for_worker(19, 100);

to get the recommended 100 task requests for the worker with worker_id = 19.

We can see that in the ELSE block in recommend_task_requests_for_worker we have a different comparison than the one we explained. That is the one we use as a fallback, which we will discuss in section 2.

Keeping Worker Profiles Up to Date

Now we want to build the worker recommendation profiles as time passes and as workers complete tasks. As we said we started with 3, but that will change as the worker completes more tasks. For this purpose we will recalculate the worker recommendation profile every time the worker completes a task. For this purpose we define:

CREATE OR REPLACE FUNCTION
recalculate_worker_profile(
    p_worker_id BIGINT
)
RETURNS void
LANGUAGE plpgsql
AS
$$
BEGIN
    INSERT INTO
    worker_recommendation_profiles
    (
        worker_id,
        preference_embedding,
        updated_at
    )
    SELECT
        o.worker_id,
        AVG(tre.embedding),
        CURRENT_TIMESTAMP
    FROM Task t
    JOIN Offer o
        ON o.id = t.offer_id
    JOIN task_request_embeddings tre
        ON tre.task_request_id =
           o.task_request_id
    WHERE
        o.worker_id = p_worker_id
        AND t.status = 'COMPLETED'
    GROUP BY
        o.worker_id
    ON CONFLICT (worker_id)
    DO UPDATE SET
        preference_embedding =
        EXCLUDED.preference_embedding,
        updated_at =
        CURRENT_TIMESTAMP;
END;
$$;

CREATE OR REPLACE FUNCTION
trg_recalculate_worker_profile()
RETURNS trigger
LANGUAGE plpgsql
AS
$$
DECLARE
    v_worker_id BIGINT;
BEGIN
    IF NEW.status = 'COMPLETED'
       AND (TG_OP = 'INSERT' OR OLD.status IS DISTINCT FROM NEW.status)
    THEN
        SELECT o.worker_id
        INTO v_worker_id
        FROM Offer o
        WHERE o.id = NEW.offer_id;
        PERFORM recalculate_worker_profile(v_worker_id);
    END IF;
    RETURN NEW;
END;
$$;

CREATE TRIGGER after_task_refresh_worker_profile
AFTER INSERT OR UPDATE OF status
ON Task
FOR EACH ROW
EXECUTE FUNCTION trg_recalculate_worker_profile();

With this we keep the worker's profile up to date with the worker's completed tasks.

For this approach to work, every new task request submitted by a client must be embedded before it can participate in the recommendation process.

2. Category Based Recommendation

If the worker has no completed tasks we will use the experience description that the worker provided when signing up for a category.

For this purpose we embed the worker_category experience description and we get one embedding per category per worker. We do this in the script embed_worker_category_description.py. We also embed the task requests with their description with the script embed_task_requests_description.py and do the matching this way. We do this in the ELSE block of the recommend_task_requests_for_worker function.

Similarly, new worker category descriptions must also be embedded before they can be used for category-based recommendations.

For saving the results from the scripts and for faster search we use these tables and indexes:

CREATE TABLE worker_category_description_vector (
    id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    worker_category_id INT NOT NULL UNIQUE,
    embedding vector(384) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (worker_category_id)
    REFERENCES WorkerCategory(id)
    ON DELETE CASCADE
);

CREATE TABLE task_request_description_vector (
    id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    task_request_id INT NOT NULL UNIQUE,
    embedding vector(384) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (task_request_id)
    REFERENCES TaskRequest(id)
    ON DELETE CASCADE
);

CREATE INDEX idx_worker_category_embeddings_hnsw
ON WorkerCategoryVectorv3
USING hnsw (embedding vector_cosine_ops);

CREATE INDEX idx_task_request_desc_embeddings_hnsw
ON TaskRequestVectorv3
USING hnsw (embedding vector_cosine_ops);

Attachments (4)

Download all attachments as: .zip

Note: See TracWiki for help on using the wiki.