source: imaps-frontend/node_modules/@parcel/watcher/src/watchman/WatchmanBackend.cc@ 79a0317

main
Last change on this file since 79a0317 was 0c6b92a, checked in by stefan toskovski <stefantoska84@…>, 6 weeks ago

Pred finalna verzija

  • Property mode set to 100644
File size: 8.3 KB
Line 
1#include <string>
2#include <fstream>
3#include <stdlib.h>
4#include <algorithm>
5#include "../DirTree.hh"
6#include "../Event.hh"
7#include "./BSER.hh"
8#include "./WatchmanBackend.hh"
9
10#ifdef _WIN32
11#include "../windows/win_utils.hh"
12#define S_ISDIR(mode) ((mode & _S_IFDIR) == _S_IFDIR)
13#define popen _popen
14#define pclose _pclose
15#else
16#include <sys/stat.h>
17#define normalizePath(dir) dir
18#endif
19
20template<typename T>
21BSER readBSER(T &&do_read) {
22 std::stringstream oss;
23 char buffer[256];
24 int r;
25 int64_t len = -1;
26 do {
27 // Start by reading a minimal amount of data in order to decode the length.
28 // After that, attempt to read the remaining length, up to the buffer size.
29 r = do_read(buffer, len == -1 ? 20 : (len < 256 ? len : 256));
30 oss << std::string(buffer, r);
31
32 if (len == -1) {
33 uint64_t l = BSER::decodeLength(oss);
34 len = l + oss.tellg();
35 }
36
37 len -= r;
38 } while (len > 0);
39
40 return BSER(oss);
41}
42
43std::string getSockPath() {
44 auto var = getenv("WATCHMAN_SOCK");
45 if (var && *var) {
46 return std::string(var);
47 }
48
49 FILE *fp = popen("watchman --output-encoding=bser get-sockname", "r");
50 if (fp == NULL || errno == ECHILD) {
51 throw std::runtime_error("Failed to execute watchman");
52 }
53
54 BSER b = readBSER([fp] (char *buf, size_t len) {
55 return fread(buf, sizeof(char), len, fp);
56 });
57
58 pclose(fp);
59
60 auto objValue = b.objectValue();
61 auto foundSockname = objValue.find("sockname");
62 if (foundSockname == objValue.end()) {
63 throw std::runtime_error("sockname not found");
64 }
65 return foundSockname->second.stringValue();
66}
67
68std::unique_ptr<IPC> watchmanConnect() {
69 std::string path = getSockPath();
70 return std::unique_ptr<IPC>(new IPC(path));
71}
72
73BSER watchmanRead(IPC *ipc) {
74 return readBSER([ipc] (char *buf, size_t len) {
75 return ipc->read(buf, len);
76 });
77}
78
79BSER::Object WatchmanBackend::watchmanRequest(BSER b) {
80 std::string cmd = b.encode();
81 mIPC->write(cmd);
82 mRequestSignal.notify();
83
84 mResponseSignal.wait();
85 mResponseSignal.reset();
86
87 if (!mError.empty()) {
88 std::runtime_error err = std::runtime_error(mError);
89 mError = std::string();
90 throw err;
91 }
92
93 return mResponse;
94}
95
96void WatchmanBackend::watchmanWatch(std::string dir) {
97 std::vector<BSER> cmd;
98 cmd.push_back("watch");
99 cmd.push_back(normalizePath(dir));
100 watchmanRequest(cmd);
101}
102
103bool WatchmanBackend::checkAvailable() {
104 try {
105 watchmanConnect();
106 return true;
107 } catch (std::exception &err) {
108 return false;
109 }
110}
111
112void handleFiles(WatcherRef watcher, BSER::Object obj) {
113 auto found = obj.find("files");
114 if (found == obj.end()) {
115 throw WatcherError("Error reading changes from watchman", watcher);
116 }
117
118 auto files = found->second.arrayValue();
119 for (auto it = files.begin(); it != files.end(); it++) {
120 auto file = it->objectValue();
121 auto name = file.find("name")->second.stringValue();
122 #ifdef _WIN32
123 std::replace(name.begin(), name.end(), '/', '\\');
124 #endif
125 auto mode = file.find("mode")->second.intValue();
126 auto isNew = file.find("new")->second.boolValue();
127 auto exists = file.find("exists")->second.boolValue();
128 auto path = watcher->mDir + DIR_SEP + name;
129 if (watcher->isIgnored(path)) {
130 continue;
131 }
132
133 if (isNew && exists) {
134 watcher->mEvents.create(path);
135 } else if (exists && !S_ISDIR(mode)) {
136 watcher->mEvents.update(path);
137 } else if (!isNew && !exists) {
138 watcher->mEvents.remove(path);
139 }
140 }
141}
142
143void WatchmanBackend::handleSubscription(BSER::Object obj) {
144 std::unique_lock<std::mutex> lock(mMutex);
145 auto subscription = obj.find("subscription")->second.stringValue();
146 auto it = mSubscriptions.find(subscription);
147 if (it == mSubscriptions.end()) {
148 return;
149 }
150
151 auto watcher = it->second;
152 try {
153 handleFiles(watcher, obj);
154 watcher->notify();
155 } catch (WatcherError &err) {
156 handleWatcherError(err);
157 }
158}
159
160void WatchmanBackend::start() {
161 mIPC = watchmanConnect();
162 notifyStarted();
163
164 while (true) {
165 // If there are no subscriptions we are reading, wait for a request.
166 if (mSubscriptions.size() == 0) {
167 mRequestSignal.wait();
168 mRequestSignal.reset();
169 }
170
171 // Break out of loop if we are stopped.
172 if (mStopped) {
173 break;
174 }
175
176 // Attempt to read from the socket.
177 // If there is an error and we are stopped, break.
178 BSER b;
179 try {
180 b = watchmanRead(&*mIPC);
181 } catch (std::exception &err) {
182 if (mStopped) {
183 break;
184 } else if (mResponseSignal.isWaiting()) {
185 mError = err.what();
186 mResponseSignal.notify();
187 } else {
188 // Throwing causes the backend to be destroyed, but we never reach the code below to notify the signal
189 mEndedSignal.notify();
190 throw;
191 }
192 }
193
194 auto obj = b.objectValue();
195 auto error = obj.find("error");
196 if (error != obj.end()) {
197 mError = error->second.stringValue();
198 mResponseSignal.notify();
199 continue;
200 }
201
202 // If this message is for a subscription, handle it, otherwise notify the request.
203 auto subscription = obj.find("subscription");
204 if (subscription != obj.end()) {
205 handleSubscription(obj);
206 } else {
207 mResponse = obj;
208 mResponseSignal.notify();
209 }
210 }
211
212 mEndedSignal.notify();
213}
214
215WatchmanBackend::~WatchmanBackend() {
216 // Mark the watcher as stopped, close the socket, and trigger the lock.
217 // This will cause the read loop to be broken and the thread to exit.
218 mStopped = true;
219 mIPC.reset();
220 mRequestSignal.notify();
221
222 // If not ended yet, wait.
223 mEndedSignal.wait();
224}
225
226std::string WatchmanBackend::clock(WatcherRef watcher) {
227 BSER::Array cmd;
228 cmd.push_back("clock");
229 cmd.push_back(normalizePath(watcher->mDir));
230
231 BSER::Object obj = watchmanRequest(cmd);
232 auto found = obj.find("clock");
233 if (found == obj.end()) {
234 throw WatcherError("Error reading clock from watchman", watcher);
235 }
236
237 return found->second.stringValue();
238}
239
240void WatchmanBackend::writeSnapshot(WatcherRef watcher, std::string *snapshotPath) {
241 std::unique_lock<std::mutex> lock(mMutex);
242 watchmanWatch(watcher->mDir);
243
244 std::ofstream ofs(*snapshotPath);
245 ofs << clock(watcher);
246}
247
248void WatchmanBackend::getEventsSince(WatcherRef watcher, std::string *snapshotPath) {
249 std::unique_lock<std::mutex> lock(mMutex);
250 std::ifstream ifs(*snapshotPath);
251 if (ifs.fail()) {
252 return;
253 }
254
255 watchmanWatch(watcher->mDir);
256
257 std::string clock;
258 ifs >> clock;
259
260 BSER::Array cmd;
261 cmd.push_back("since");
262 cmd.push_back(normalizePath(watcher->mDir));
263 cmd.push_back(clock);
264
265 BSER::Object obj = watchmanRequest(cmd);
266 handleFiles(watcher, obj);
267}
268
269std::string getId(WatcherRef watcher) {
270 std::ostringstream id;
271 id << "parcel-";
272 id << static_cast<void*>(watcher.get());
273 return id.str();
274}
275
276// This function is called by Backend::watch which takes a lock on mMutex
277void WatchmanBackend::subscribe(WatcherRef watcher) {
278 watchmanWatch(watcher->mDir);
279
280 std::string id = getId(watcher);
281 BSER::Array cmd;
282 cmd.push_back("subscribe");
283 cmd.push_back(normalizePath(watcher->mDir));
284 cmd.push_back(id);
285
286 BSER::Array fields;
287 fields.push_back("name");
288 fields.push_back("mode");
289 fields.push_back("exists");
290 fields.push_back("new");
291
292 BSER::Object opts;
293 opts.emplace("fields", fields);
294 opts.emplace("since", clock(watcher));
295
296 if (watcher->mIgnorePaths.size() > 0) {
297 BSER::Array ignore;
298 BSER::Array anyOf;
299 anyOf.push_back("anyof");
300
301 for (auto it = watcher->mIgnorePaths.begin(); it != watcher->mIgnorePaths.end(); it++) {
302 std::string pathStart = watcher->mDir + DIR_SEP;
303 if (it->rfind(pathStart, 0) == 0) {
304 auto relative = it->substr(pathStart.size());
305 BSER::Array dirname;
306 dirname.push_back("dirname");
307 dirname.push_back(relative);
308 anyOf.push_back(dirname);
309 }
310 }
311
312 ignore.push_back("not");
313 ignore.push_back(anyOf);
314
315 opts.emplace("expression", ignore);
316 }
317
318 cmd.push_back(opts);
319 watchmanRequest(cmd);
320
321 mSubscriptions.emplace(id, watcher);
322 mRequestSignal.notify();
323}
324
325// This function is called by Backend::unwatch which takes a lock on mMutex
326void WatchmanBackend::unsubscribe(WatcherRef watcher) {
327 std::string id = getId(watcher);
328 auto erased = mSubscriptions.erase(id);
329
330 if (erased) {
331 BSER::Array cmd;
332 cmd.push_back("unsubscribe");
333 cmd.push_back(normalizePath(watcher->mDir));
334 cmd.push_back(id);
335
336 watchmanRequest(cmd);
337 }
338}
Note: See TracBrowser for help on using the repository browser.