1 |
|
---|
2 |
|
---|
3 | const debug = require('debug')('log4js:multiprocess');
|
---|
4 | const net = require('net');
|
---|
5 | const LoggingEvent = require('../LoggingEvent');
|
---|
6 |
|
---|
7 | const END_MSG = '__LOG4JS__';
|
---|
8 |
|
---|
9 | /**
|
---|
10 | * Creates a server, listening on config.loggerPort, config.loggerHost.
|
---|
11 | * Output goes to config.actualAppender (config.appender is used to
|
---|
12 | * set up that appender).
|
---|
13 | */
|
---|
14 | function logServer(config, actualAppender, levels) {
|
---|
15 | /**
|
---|
16 | * Takes a utf-8 string, returns an object with
|
---|
17 | * the correct log properties.
|
---|
18 | */
|
---|
19 | function deserializeLoggingEvent(clientSocket, msg) {
|
---|
20 | debug('(master) deserialising log event');
|
---|
21 | const loggingEvent = LoggingEvent.deserialise(msg);
|
---|
22 | loggingEvent.remoteAddress = clientSocket.remoteAddress;
|
---|
23 | loggingEvent.remotePort = clientSocket.remotePort;
|
---|
24 |
|
---|
25 | return loggingEvent;
|
---|
26 | }
|
---|
27 |
|
---|
28 | /* eslint prefer-arrow-callback:0 */
|
---|
29 | const server = net.createServer(function connectionHandler(clientSocket) {
|
---|
30 | debug('(master) connection received');
|
---|
31 | clientSocket.setEncoding('utf8');
|
---|
32 | let logMessage = '';
|
---|
33 |
|
---|
34 | function logTheMessage(msg) {
|
---|
35 | if (logMessage.length > 0) {
|
---|
36 | debug('(master) deserialising log event and sending to actual appender');
|
---|
37 | actualAppender(deserializeLoggingEvent(clientSocket, msg));
|
---|
38 | }
|
---|
39 | }
|
---|
40 |
|
---|
41 | function chunkReceived(chunk) {
|
---|
42 | debug('(master) chunk of data received');
|
---|
43 | let event;
|
---|
44 | logMessage += chunk || '';
|
---|
45 | if (logMessage.indexOf(END_MSG) > -1) {
|
---|
46 | event = logMessage.substring(0, logMessage.indexOf(END_MSG));
|
---|
47 | logTheMessage(event);
|
---|
48 | logMessage = logMessage.substring(event.length + END_MSG.length) || '';
|
---|
49 | // check for more, maybe it was a big chunk
|
---|
50 | chunkReceived();
|
---|
51 | }
|
---|
52 | }
|
---|
53 |
|
---|
54 | function handleError(error) {
|
---|
55 | const loggingEvent = {
|
---|
56 | startTime: new Date(),
|
---|
57 | categoryName: 'log4js',
|
---|
58 | level: levels.ERROR,
|
---|
59 | data: ['A worker log process hung up unexpectedly', error],
|
---|
60 | remoteAddress: clientSocket.remoteAddress,
|
---|
61 | remotePort: clientSocket.remotePort
|
---|
62 | };
|
---|
63 | actualAppender(loggingEvent);
|
---|
64 | }
|
---|
65 |
|
---|
66 | clientSocket.on('data', chunkReceived);
|
---|
67 | clientSocket.on('end', chunkReceived);
|
---|
68 | clientSocket.on('error', handleError);
|
---|
69 | });
|
---|
70 |
|
---|
71 | server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function (e) {
|
---|
72 | debug('(master) master server listening, error was ', e);
|
---|
73 | // allow the process to exit, if this is the only socket active
|
---|
74 | server.unref();
|
---|
75 | });
|
---|
76 |
|
---|
77 | function app(event) {
|
---|
78 | debug('(master) log event sent directly to actual appender (local event)');
|
---|
79 | return actualAppender(event);
|
---|
80 | }
|
---|
81 |
|
---|
82 | app.shutdown = function (cb) {
|
---|
83 | debug('(master) master shutdown called, closing server');
|
---|
84 | server.close(cb);
|
---|
85 | };
|
---|
86 |
|
---|
87 | return app;
|
---|
88 | }
|
---|
89 |
|
---|
90 | function workerAppender(config) {
|
---|
91 | let canWrite = false;
|
---|
92 | const buffer = [];
|
---|
93 | let socket;
|
---|
94 | let shutdownAttempts = 3;
|
---|
95 |
|
---|
96 | function write(loggingEvent) {
|
---|
97 | debug('(worker) Writing log event to socket');
|
---|
98 | socket.write(loggingEvent.serialise(), 'utf8');
|
---|
99 | socket.write(END_MSG, 'utf8');
|
---|
100 | }
|
---|
101 |
|
---|
102 | function emptyBuffer() {
|
---|
103 | let evt;
|
---|
104 | debug('(worker) emptying worker buffer');
|
---|
105 | /* eslint no-cond-assign:0 */
|
---|
106 | while ((evt = buffer.shift())) {
|
---|
107 | write(evt);
|
---|
108 | }
|
---|
109 | }
|
---|
110 |
|
---|
111 | function createSocket() {
|
---|
112 | debug(
|
---|
113 | `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`
|
---|
114 | );
|
---|
115 | socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
|
---|
116 | socket.on('connect', () => {
|
---|
117 | debug('(worker) worker socket connected');
|
---|
118 | emptyBuffer();
|
---|
119 | canWrite = true;
|
---|
120 | });
|
---|
121 | socket.on('timeout', socket.end.bind(socket));
|
---|
122 | // don't bother listening for 'error', 'close' gets called after that anyway
|
---|
123 | socket.on('close', createSocket);
|
---|
124 | }
|
---|
125 |
|
---|
126 | createSocket();
|
---|
127 |
|
---|
128 | function log(loggingEvent) {
|
---|
129 | if (canWrite) {
|
---|
130 | write(loggingEvent);
|
---|
131 | } else {
|
---|
132 | debug('(worker) worker buffering log event because it cannot write at the moment');
|
---|
133 | buffer.push(loggingEvent);
|
---|
134 | }
|
---|
135 | }
|
---|
136 | log.shutdown = function (cb) {
|
---|
137 | debug('(worker) worker shutdown called');
|
---|
138 | if (buffer.length && shutdownAttempts) {
|
---|
139 | debug('(worker) worker buffer has items, waiting 100ms to empty');
|
---|
140 | shutdownAttempts -= 1;
|
---|
141 | setTimeout(() => {
|
---|
142 | log.shutdown(cb);
|
---|
143 | }, 100);
|
---|
144 | } else {
|
---|
145 | socket.removeAllListeners('close');
|
---|
146 | socket.end(cb);
|
---|
147 | }
|
---|
148 | };
|
---|
149 | return log;
|
---|
150 | }
|
---|
151 |
|
---|
152 | function createAppender(config, appender, levels) {
|
---|
153 | if (config.mode === 'master') {
|
---|
154 | debug('Creating master appender');
|
---|
155 | return logServer(config, appender, levels);
|
---|
156 | }
|
---|
157 |
|
---|
158 | debug('Creating worker appender');
|
---|
159 | return workerAppender(config);
|
---|
160 | }
|
---|
161 |
|
---|
162 | function configure(config, layouts, findAppender, levels) {
|
---|
163 | let appender;
|
---|
164 | debug(`configure with mode = ${config.mode}`);
|
---|
165 | if (config.mode === 'master') {
|
---|
166 | if (!config.appender) {
|
---|
167 | debug(`no appender found in config ${config}`);
|
---|
168 | throw new Error('multiprocess master must have an "appender" defined');
|
---|
169 | }
|
---|
170 | debug(`actual appender is ${config.appender}`);
|
---|
171 | appender = findAppender(config.appender);
|
---|
172 | if (!appender) {
|
---|
173 | debug(`actual appender "${config.appender}" not found`);
|
---|
174 | throw new Error(`multiprocess master appender "${config.appender}" not defined`);
|
---|
175 | }
|
---|
176 | }
|
---|
177 | return createAppender(config, appender, levels);
|
---|
178 | }
|
---|
179 |
|
---|
180 | module.exports.configure = configure;
|
---|