source: trip-planner-front/node_modules/log4js/lib/appenders/multiprocess.js@ 76712b2

Last change on this file since 76712b2 was 6a3a178, checked in by Ema <ema_spirova@…>, 3 years ago

initial commit

  • Property mode set to 100644
File size: 5.3 KB
Line 
1
2
3const debug = require('debug')('log4js:multiprocess');
4const net = require('net');
5const LoggingEvent = require('../LoggingEvent');
6
7const END_MSG = '__LOG4JS__';
8
9/**
10 * Creates a server, listening on config.loggerPort, config.loggerHost.
11 * Output goes to config.actualAppender (config.appender is used to
12 * set up that appender).
13 */
14function logServer(config, actualAppender, levels) {
15 /**
16 * Takes a utf-8 string, returns an object with
17 * the correct log properties.
18 */
19 function deserializeLoggingEvent(clientSocket, msg) {
20 debug('(master) deserialising log event');
21 const loggingEvent = LoggingEvent.deserialise(msg);
22 loggingEvent.remoteAddress = clientSocket.remoteAddress;
23 loggingEvent.remotePort = clientSocket.remotePort;
24
25 return loggingEvent;
26 }
27
28 /* eslint prefer-arrow-callback:0 */
29 const server = net.createServer(function connectionHandler(clientSocket) {
30 debug('(master) connection received');
31 clientSocket.setEncoding('utf8');
32 let logMessage = '';
33
34 function logTheMessage(msg) {
35 if (logMessage.length > 0) {
36 debug('(master) deserialising log event and sending to actual appender');
37 actualAppender(deserializeLoggingEvent(clientSocket, msg));
38 }
39 }
40
41 function chunkReceived(chunk) {
42 debug('(master) chunk of data received');
43 let event;
44 logMessage += chunk || '';
45 if (logMessage.indexOf(END_MSG) > -1) {
46 event = logMessage.substring(0, logMessage.indexOf(END_MSG));
47 logTheMessage(event);
48 logMessage = logMessage.substring(event.length + END_MSG.length) || '';
49 // check for more, maybe it was a big chunk
50 chunkReceived();
51 }
52 }
53
54 function handleError(error) {
55 const loggingEvent = {
56 startTime: new Date(),
57 categoryName: 'log4js',
58 level: levels.ERROR,
59 data: ['A worker log process hung up unexpectedly', error],
60 remoteAddress: clientSocket.remoteAddress,
61 remotePort: clientSocket.remotePort
62 };
63 actualAppender(loggingEvent);
64 }
65
66 clientSocket.on('data', chunkReceived);
67 clientSocket.on('end', chunkReceived);
68 clientSocket.on('error', handleError);
69 });
70
71 server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function (e) {
72 debug('(master) master server listening, error was ', e);
73 // allow the process to exit, if this is the only socket active
74 server.unref();
75 });
76
77 function app(event) {
78 debug('(master) log event sent directly to actual appender (local event)');
79 return actualAppender(event);
80 }
81
82 app.shutdown = function (cb) {
83 debug('(master) master shutdown called, closing server');
84 server.close(cb);
85 };
86
87 return app;
88}
89
90function workerAppender(config) {
91 let canWrite = false;
92 const buffer = [];
93 let socket;
94 let shutdownAttempts = 3;
95
96 function write(loggingEvent) {
97 debug('(worker) Writing log event to socket');
98 socket.write(loggingEvent.serialise(), 'utf8');
99 socket.write(END_MSG, 'utf8');
100 }
101
102 function emptyBuffer() {
103 let evt;
104 debug('(worker) emptying worker buffer');
105 /* eslint no-cond-assign:0 */
106 while ((evt = buffer.shift())) {
107 write(evt);
108 }
109 }
110
111 function createSocket() {
112 debug(
113 `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`
114 );
115 socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
116 socket.on('connect', () => {
117 debug('(worker) worker socket connected');
118 emptyBuffer();
119 canWrite = true;
120 });
121 socket.on('timeout', socket.end.bind(socket));
122 // don't bother listening for 'error', 'close' gets called after that anyway
123 socket.on('close', createSocket);
124 }
125
126 createSocket();
127
128 function log(loggingEvent) {
129 if (canWrite) {
130 write(loggingEvent);
131 } else {
132 debug('(worker) worker buffering log event because it cannot write at the moment');
133 buffer.push(loggingEvent);
134 }
135 }
136 log.shutdown = function (cb) {
137 debug('(worker) worker shutdown called');
138 if (buffer.length && shutdownAttempts) {
139 debug('(worker) worker buffer has items, waiting 100ms to empty');
140 shutdownAttempts -= 1;
141 setTimeout(() => {
142 log.shutdown(cb);
143 }, 100);
144 } else {
145 socket.removeAllListeners('close');
146 socket.end(cb);
147 }
148 };
149 return log;
150}
151
152function createAppender(config, appender, levels) {
153 if (config.mode === 'master') {
154 debug('Creating master appender');
155 return logServer(config, appender, levels);
156 }
157
158 debug('Creating worker appender');
159 return workerAppender(config);
160}
161
162function configure(config, layouts, findAppender, levels) {
163 let appender;
164 debug(`configure with mode = ${config.mode}`);
165 if (config.mode === 'master') {
166 if (!config.appender) {
167 debug(`no appender found in config ${config}`);
168 throw new Error('multiprocess master must have an "appender" defined');
169 }
170 debug(`actual appender is ${config.appender}`);
171 appender = findAppender(config.appender);
172 if (!appender) {
173 debug(`actual appender "${config.appender}" not found`);
174 throw new Error(`multiprocess master appender "${config.appender}" not defined`);
175 }
176 }
177 return createAppender(config, appender, levels);
178}
179
180module.exports.configure = configure;
Note: See TracBrowser for help on using the repository browser.