[6a3a178] | 1 |
|
---|
| 2 |
|
---|
| 3 | const debug = require('debug')('log4js:multiprocess');
|
---|
| 4 | const net = require('net');
|
---|
| 5 | const LoggingEvent = require('../LoggingEvent');
|
---|
| 6 |
|
---|
| 7 | const END_MSG = '__LOG4JS__';
|
---|
| 8 |
|
---|
| 9 | /**
|
---|
| 10 | * Creates a server, listening on config.loggerPort, config.loggerHost.
|
---|
| 11 | * Output goes to config.actualAppender (config.appender is used to
|
---|
| 12 | * set up that appender).
|
---|
| 13 | */
|
---|
| 14 | function logServer(config, actualAppender, levels) {
|
---|
| 15 | /**
|
---|
| 16 | * Takes a utf-8 string, returns an object with
|
---|
| 17 | * the correct log properties.
|
---|
| 18 | */
|
---|
| 19 | function deserializeLoggingEvent(clientSocket, msg) {
|
---|
| 20 | debug('(master) deserialising log event');
|
---|
| 21 | const loggingEvent = LoggingEvent.deserialise(msg);
|
---|
| 22 | loggingEvent.remoteAddress = clientSocket.remoteAddress;
|
---|
| 23 | loggingEvent.remotePort = clientSocket.remotePort;
|
---|
| 24 |
|
---|
| 25 | return loggingEvent;
|
---|
| 26 | }
|
---|
| 27 |
|
---|
| 28 | /* eslint prefer-arrow-callback:0 */
|
---|
| 29 | const server = net.createServer(function connectionHandler(clientSocket) {
|
---|
| 30 | debug('(master) connection received');
|
---|
| 31 | clientSocket.setEncoding('utf8');
|
---|
| 32 | let logMessage = '';
|
---|
| 33 |
|
---|
| 34 | function logTheMessage(msg) {
|
---|
| 35 | if (logMessage.length > 0) {
|
---|
| 36 | debug('(master) deserialising log event and sending to actual appender');
|
---|
| 37 | actualAppender(deserializeLoggingEvent(clientSocket, msg));
|
---|
| 38 | }
|
---|
| 39 | }
|
---|
| 40 |
|
---|
| 41 | function chunkReceived(chunk) {
|
---|
| 42 | debug('(master) chunk of data received');
|
---|
| 43 | let event;
|
---|
| 44 | logMessage += chunk || '';
|
---|
| 45 | if (logMessage.indexOf(END_MSG) > -1) {
|
---|
| 46 | event = logMessage.substring(0, logMessage.indexOf(END_MSG));
|
---|
| 47 | logTheMessage(event);
|
---|
| 48 | logMessage = logMessage.substring(event.length + END_MSG.length) || '';
|
---|
| 49 | // check for more, maybe it was a big chunk
|
---|
| 50 | chunkReceived();
|
---|
| 51 | }
|
---|
| 52 | }
|
---|
| 53 |
|
---|
| 54 | function handleError(error) {
|
---|
| 55 | const loggingEvent = {
|
---|
| 56 | startTime: new Date(),
|
---|
| 57 | categoryName: 'log4js',
|
---|
| 58 | level: levels.ERROR,
|
---|
| 59 | data: ['A worker log process hung up unexpectedly', error],
|
---|
| 60 | remoteAddress: clientSocket.remoteAddress,
|
---|
| 61 | remotePort: clientSocket.remotePort
|
---|
| 62 | };
|
---|
| 63 | actualAppender(loggingEvent);
|
---|
| 64 | }
|
---|
| 65 |
|
---|
| 66 | clientSocket.on('data', chunkReceived);
|
---|
| 67 | clientSocket.on('end', chunkReceived);
|
---|
| 68 | clientSocket.on('error', handleError);
|
---|
| 69 | });
|
---|
| 70 |
|
---|
| 71 | server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function (e) {
|
---|
| 72 | debug('(master) master server listening, error was ', e);
|
---|
| 73 | // allow the process to exit, if this is the only socket active
|
---|
| 74 | server.unref();
|
---|
| 75 | });
|
---|
| 76 |
|
---|
| 77 | function app(event) {
|
---|
| 78 | debug('(master) log event sent directly to actual appender (local event)');
|
---|
| 79 | return actualAppender(event);
|
---|
| 80 | }
|
---|
| 81 |
|
---|
| 82 | app.shutdown = function (cb) {
|
---|
| 83 | debug('(master) master shutdown called, closing server');
|
---|
| 84 | server.close(cb);
|
---|
| 85 | };
|
---|
| 86 |
|
---|
| 87 | return app;
|
---|
| 88 | }
|
---|
| 89 |
|
---|
| 90 | function workerAppender(config) {
|
---|
| 91 | let canWrite = false;
|
---|
| 92 | const buffer = [];
|
---|
| 93 | let socket;
|
---|
| 94 | let shutdownAttempts = 3;
|
---|
| 95 |
|
---|
| 96 | function write(loggingEvent) {
|
---|
| 97 | debug('(worker) Writing log event to socket');
|
---|
| 98 | socket.write(loggingEvent.serialise(), 'utf8');
|
---|
| 99 | socket.write(END_MSG, 'utf8');
|
---|
| 100 | }
|
---|
| 101 |
|
---|
| 102 | function emptyBuffer() {
|
---|
| 103 | let evt;
|
---|
| 104 | debug('(worker) emptying worker buffer');
|
---|
| 105 | /* eslint no-cond-assign:0 */
|
---|
| 106 | while ((evt = buffer.shift())) {
|
---|
| 107 | write(evt);
|
---|
| 108 | }
|
---|
| 109 | }
|
---|
| 110 |
|
---|
| 111 | function createSocket() {
|
---|
| 112 | debug(
|
---|
| 113 | `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`
|
---|
| 114 | );
|
---|
| 115 | socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
|
---|
| 116 | socket.on('connect', () => {
|
---|
| 117 | debug('(worker) worker socket connected');
|
---|
| 118 | emptyBuffer();
|
---|
| 119 | canWrite = true;
|
---|
| 120 | });
|
---|
| 121 | socket.on('timeout', socket.end.bind(socket));
|
---|
| 122 | // don't bother listening for 'error', 'close' gets called after that anyway
|
---|
| 123 | socket.on('close', createSocket);
|
---|
| 124 | }
|
---|
| 125 |
|
---|
| 126 | createSocket();
|
---|
| 127 |
|
---|
| 128 | function log(loggingEvent) {
|
---|
| 129 | if (canWrite) {
|
---|
| 130 | write(loggingEvent);
|
---|
| 131 | } else {
|
---|
| 132 | debug('(worker) worker buffering log event because it cannot write at the moment');
|
---|
| 133 | buffer.push(loggingEvent);
|
---|
| 134 | }
|
---|
| 135 | }
|
---|
| 136 | log.shutdown = function (cb) {
|
---|
| 137 | debug('(worker) worker shutdown called');
|
---|
| 138 | if (buffer.length && shutdownAttempts) {
|
---|
| 139 | debug('(worker) worker buffer has items, waiting 100ms to empty');
|
---|
| 140 | shutdownAttempts -= 1;
|
---|
| 141 | setTimeout(() => {
|
---|
| 142 | log.shutdown(cb);
|
---|
| 143 | }, 100);
|
---|
| 144 | } else {
|
---|
| 145 | socket.removeAllListeners('close');
|
---|
| 146 | socket.end(cb);
|
---|
| 147 | }
|
---|
| 148 | };
|
---|
| 149 | return log;
|
---|
| 150 | }
|
---|
| 151 |
|
---|
| 152 | function createAppender(config, appender, levels) {
|
---|
| 153 | if (config.mode === 'master') {
|
---|
| 154 | debug('Creating master appender');
|
---|
| 155 | return logServer(config, appender, levels);
|
---|
| 156 | }
|
---|
| 157 |
|
---|
| 158 | debug('Creating worker appender');
|
---|
| 159 | return workerAppender(config);
|
---|
| 160 | }
|
---|
| 161 |
|
---|
| 162 | function configure(config, layouts, findAppender, levels) {
|
---|
| 163 | let appender;
|
---|
| 164 | debug(`configure with mode = ${config.mode}`);
|
---|
| 165 | if (config.mode === 'master') {
|
---|
| 166 | if (!config.appender) {
|
---|
| 167 | debug(`no appender found in config ${config}`);
|
---|
| 168 | throw new Error('multiprocess master must have an "appender" defined');
|
---|
| 169 | }
|
---|
| 170 | debug(`actual appender is ${config.appender}`);
|
---|
| 171 | appender = findAppender(config.appender);
|
---|
| 172 | if (!appender) {
|
---|
| 173 | debug(`actual appender "${config.appender}" not found`);
|
---|
| 174 | throw new Error(`multiprocess master appender "${config.appender}" not defined`);
|
---|
| 175 | }
|
---|
| 176 | }
|
---|
| 177 | return createAppender(config, appender, levels);
|
---|
| 178 | }
|
---|
| 179 |
|
---|
| 180 | module.exports.configure = configure;
|
---|