| | 1 | = Recommendation System = |
| | 2 | |
| | 3 | For our recommendation system, we used a vector database so we can recommend task requests on the worker's feed so that they are closest to the tasks that the worker usually takes on. |
| | 4 | |
| | 5 | For our recommendation system we will try to suggest task requests that are closest to the user's past completed tasks with a fallback option, that is, if the worker hasn't completed any tasks yet we will try to match the task requests closest to his experience description for the particular category. For this purpose we added a column in the `worker_category` table called `experience_description`. This is supposed to be the description that the worker has provided when signing up for the given category. |
| | 6 | |
| | 7 | We will look at how both of the strategies are implemented. |
| | 8 | |
| | 9 | == 1. Personalized Recommendations == |
| | 10 | |
| | 11 | This is the recommendation system that recommends open task requests to workers that have the most similar completed tasks. For this purpose we first embedded the open task requests and then as a starting point for our system we embedded three task requests that resulted in a completed task for each worker. We found this number of tasks enough as a starting point for our app. We built these embeddings with the script [attachment:embed_taskrequests.py] and we used a table to store these vectors: |
| | 12 | |
| | 13 | {{{ |
| | 14 | #!sql |
| | 15 | CREATE TABLE task_request_embeddings ( |
| | 16 | task_request_id INT PRIMARY KEY |
| | 17 | REFERENCES TaskRequest(id) |
| | 18 | ON DELETE CASCADE |
| | 19 | ON UPDATE CASCADE, |
| | 20 | |
| | 21 | embedding vector(384) NOT NULL, |
| | 22 | |
| | 23 | embedded_at TIMESTAMP NOT NULL |
| | 24 | DEFAULT CURRENT_TIMESTAMP |
| | 25 | ); |
| | 26 | }}} |
| | 27 | |
| | 28 | Once we embedded all the task requests we needed — the completed ones so we can build the worker profiles and the open ones so we can recommend them — we continued with making the worker recommendation profiles. We did this with a Python script [attachment:embed_worker_profiles.py] and we stored them in the table: |
| | 29 | |
| | 30 | {{{ |
| | 31 | #!sql |
| | 32 | CREATE TABLE worker_recommendation_profiles ( |
| | 33 | worker_id INT PRIMARY KEY |
| | 34 | REFERENCES Worker(id) |
| | 35 | ON DELETE CASCADE, |
| | 36 | preference_embedding vector(384), |
| | 37 | updated_at TIMESTAMP NOT NULL |
| | 38 | DEFAULT CURRENT_TIMESTAMP |
| | 39 | ); |
| | 40 | }}} |
| | 41 | |
| | 42 | With all this done, we are ready to compare the worker recommendation profile embeddings with the open task request ones and build the worker feed. We did this with the function `recommend_task_requests_for_worker`: |
| | 43 | |
| | 44 | {{{ |
| | 45 | #!sql |
| | 46 | CREATE OR REPLACE FUNCTION |
| | 47 | recommend_task_requests_for_worker |
| | 48 | ( |
| | 49 | p_worker_id BIGINT, |
| | 50 | p_limit INT DEFAULT 50 |
| | 51 | ) |
| | 52 | RETURNS TABLE |
| | 53 | ( |
| | 54 | task_request_id BIGINT, |
| | 55 | distance DOUBLE PRECISION, |
| | 56 | recommendation_type TEXT |
| | 57 | ) |
| | 58 | LANGUAGE plpgsql |
| | 59 | AS $$ |
| | 60 | BEGIN |
| | 61 | IF EXISTS |
| | 62 | ( |
| | 63 | SELECT 1 |
| | 64 | FROM worker_recommendation_profiles wrp |
| | 65 | WHERE wrp.worker_id = p_worker_id |
| | 66 | AND wrp.preference_embedding IS NOT NULL |
| | 67 | ) |
| | 68 | THEN |
| | 69 | RETURN QUERY |
| | 70 | SELECT |
| | 71 | tr.id::BIGINT, |
| | 72 | tre.embedding |
| | 73 | <=> |
| | 74 | wrp.preference_embedding |
| | 75 | AS distance, |
| | 76 | 'PERSONALIZED'::TEXT |
| | 77 | FROM TaskRequest tr |
| | 78 | JOIN task_request_embeddings tre |
| | 79 | ON tre.task_request_id = tr.id |
| | 80 | JOIN worker_recommendation_profiles wrp |
| | 81 | ON wrp.worker_id = p_worker_id |
| | 82 | WHERE |
| | 83 | tre.embedding IS NOT NULL |
| | 84 | AND tr.id NOT IN |
| | 85 | ( |
| | 86 | SELECT |
| | 87 | o.task_request_id |
| | 88 | FROM Offer o |
| | 89 | WHERE |
| | 90 | o.worker_id = p_worker_id |
| | 91 | ) |
| | 92 | ORDER BY |
| | 93 | distance ASC |
| | 94 | LIMIT |
| | 95 | p_limit; |
| | 96 | ELSE |
| | 97 | RETURN QUERY |
| | 98 | SELECT |
| | 99 | tr.id::BIGINT, |
| | 100 | tre.embedding |
| | 101 | <=> |
| | 102 | wce.embedding |
| | 103 | AS distance, |
| | 104 | 'CATEGORY_BASED'::TEXT |
| | 105 | FROM TaskRequest tr |
| | 106 | JOIN taskrequestvectorv3 tre |
| | 107 | ON tre.task_request_id = tr.id |
| | 108 | JOIN WorkerCategory wc |
| | 109 | ON wc.worker_id = p_worker_id |
| | 110 | JOIN workercategoryvectorv3 wce |
| | 111 | ON wce.worker_category_id = wc.id |
| | 112 | WHERE |
| | 113 | tre.embedding IS NOT NULL |
| | 114 | AND tr.id NOT IN |
| | 115 | ( |
| | 116 | SELECT |
| | 117 | o.task_request_id |
| | 118 | FROM Offer o |
| | 119 | WHERE |
| | 120 | o.worker_id = p_worker_id |
| | 121 | ) |
| | 122 | ORDER BY |
| | 123 | distance ASC |
| | 124 | LIMIT |
| | 125 | p_limit; |
| | 126 | END IF; |
| | 127 | END; |
| | 128 | $$; |
| | 129 | }}} |
| | 130 | |
| | 131 | For faster search we used the indexes: |
| | 132 | |
| | 133 | {{{ |
| | 134 | #!sql |
| | 135 | CREATE INDEX IF NOT EXISTS |
| | 136 | idx_task_request_embeddings_hnsw |
| | 137 | ON task_request_embeddings |
| | 138 | USING hnsw |
| | 139 | ( embedding vector_cosine_ops ); |
| | 140 | |
| | 141 | CREATE INDEX IF NOT EXISTS |
| | 142 | idx_worker_category_embeddings_hnsw |
| | 143 | ON workercategoryvectorv3 |
| | 144 | USING hnsw |
| | 145 | ( embedding vector_cosine_ops ); |
| | 146 | }}} |
| | 147 | |
| | 148 | We can now use: |
| | 149 | |
| | 150 | {{{ |
| | 151 | #!sql |
| | 152 | SELECT * |
| | 153 | FROM recommend_task_requests_for_worker(19, 100); |
| | 154 | }}} |
| | 155 | |
| | 156 | to get the recommended 100 task requests for the worker with `worker_id = 19`. |
| | 157 | |
| | 158 | We can see that in the `ELSE` block in `recommend_task_requests_for_worker` we have a different comparison than the one we explained. That is the one we use as a fallback, which we will discuss in section 2. |
| | 159 | |
| | 160 | === Keeping Worker Profiles Up to Date === |
| | 161 | |
| | 162 | Now we want to build the worker recommendation profiles as time passes and as workers complete tasks. As we said we started with 3, but that will change as the worker completes more tasks. For this purpose we will recalculate the worker recommendation profile every time the worker completes a task. For this purpose we define: |
| | 163 | |
| | 164 | {{{ |
| | 165 | #!sql |
| | 166 | CREATE OR REPLACE FUNCTION |
| | 167 | recalculate_worker_profile( |
| | 168 | p_worker_id BIGINT |
| | 169 | ) |
| | 170 | RETURNS void |
| | 171 | LANGUAGE plpgsql |
| | 172 | AS |
| | 173 | $$ |
| | 174 | BEGIN |
| | 175 | INSERT INTO |
| | 176 | worker_recommendation_profiles |
| | 177 | ( |
| | 178 | worker_id, |
| | 179 | preference_embedding, |
| | 180 | updated_at |
| | 181 | ) |
| | 182 | SELECT |
| | 183 | o.worker_id, |
| | 184 | AVG(tre.embedding), |
| | 185 | CURRENT_TIMESTAMP |
| | 186 | FROM Task t |
| | 187 | JOIN Offer o |
| | 188 | ON o.id = t.offer_id |
| | 189 | JOIN task_request_embeddings tre |
| | 190 | ON tre.task_request_id = |
| | 191 | o.task_request_id |
| | 192 | WHERE |
| | 193 | o.worker_id = p_worker_id |
| | 194 | AND t.status = 'COMPLETED' |
| | 195 | GROUP BY |
| | 196 | o.worker_id |
| | 197 | ON CONFLICT (worker_id) |
| | 198 | DO UPDATE SET |
| | 199 | preference_embedding = |
| | 200 | EXCLUDED.preference_embedding, |
| | 201 | updated_at = |
| | 202 | CURRENT_TIMESTAMP; |
| | 203 | END; |
| | 204 | $$; |
| | 205 | |
| | 206 | CREATE OR REPLACE FUNCTION |
| | 207 | trg_recalculate_worker_profile() |
| | 208 | RETURNS trigger |
| | 209 | LANGUAGE plpgsql |
| | 210 | AS |
| | 211 | $$ |
| | 212 | DECLARE |
| | 213 | v_worker_id BIGINT; |
| | 214 | BEGIN |
| | 215 | IF NEW.status = 'COMPLETED' |
| | 216 | AND (TG_OP = 'INSERT' OR OLD.status IS DISTINCT FROM NEW.status) |
| | 217 | THEN |
| | 218 | SELECT o.worker_id |
| | 219 | INTO v_worker_id |
| | 220 | FROM Offer o |
| | 221 | WHERE o.id = NEW.offer_id; |
| | 222 | PERFORM recalculate_worker_profile(v_worker_id); |
| | 223 | END IF; |
| | 224 | RETURN NEW; |
| | 225 | END; |
| | 226 | $$; |
| | 227 | |
| | 228 | CREATE TRIGGER after_task_refresh_worker_profile |
| | 229 | AFTER INSERT OR UPDATE OF status |
| | 230 | ON Task |
| | 231 | FOR EACH ROW |
| | 232 | EXECUTE FUNCTION trg_recalculate_worker_profile(); |
| | 233 | }}} |
| | 234 | |
| | 235 | With this we keep the worker's profile up to date with the worker's completed tasks. |
| | 236 | |
| | 237 | For this to work we will have to embed every new task request that the client registers, and we will do this with a background job. |
| | 238 | |
| | 239 | == 2. Category Based Recommendation == |
| | 240 | |
| | 241 | If the worker has no completed tasks we will use the experience description that the worker provided when signing up for a category. |
| | 242 | |
| | 243 | For this purpose we embed the `worker_category` experience description and we get one embedding per category per worker. We do this in the script [attachment:embed_worker_category_description.py]. We also embed the task requests with their description with the script [attachment:embed_task_requests_description.py] and do the matching this way. We do this in the `ELSE` block of the `recommend_task_requests_for_worker` function. |
| | 244 | |
| | 245 | We will also have to run the scripts that embed the task request descriptions and the `worker_category` experience descriptions. We will also do this with a background job. |
| | 246 | |
| | 247 | For saving the results from the scripts and for faster search we use these tables and indexes: |
| | 248 | |
| | 249 | {{{ |
| | 250 | #!sql |
| | 251 | CREATE TABLE worker_category_description_vector ( |
| | 252 | id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, |
| | 253 | worker_category_id INT NOT NULL UNIQUE, |
| | 254 | embedding vector(384) NOT NULL, |
| | 255 | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| | 256 | |
| | 257 | FOREIGN KEY (worker_category_id) |
| | 258 | REFERENCES WorkerCategory(id) |
| | 259 | ON DELETE CASCADE |
| | 260 | ); |
| | 261 | |
| | 262 | CREATE TABLE task_request_description_vector ( |
| | 263 | id INT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, |
| | 264 | task_request_id INT NOT NULL UNIQUE, |
| | 265 | embedding vector(384) NOT NULL, |
| | 266 | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| | 267 | |
| | 268 | FOREIGN KEY (task_request_id) |
| | 269 | REFERENCES TaskRequest(id) |
| | 270 | ON DELETE CASCADE |
| | 271 | ); |
| | 272 | |
| | 273 | CREATE INDEX idx_worker_category_embeddings_hnsw |
| | 274 | ON WorkerCategoryVectorv3 |
| | 275 | USING hnsw (embedding vector_cosine_ops); |
| | 276 | |
| | 277 | CREATE INDEX idx_task_request_desc_embeddings_hnsw |
| | 278 | ON TaskRequestVectorv3 |
| | 279 | USING hnsw (embedding vector_cosine_ops); |
| | 280 | }}} |
| | 281 | |
| | 282 | For new task requests and worker category records we will use the same scripts and run them as a background job every 5 minutes. |