const debug = require('debug')('log4js:multiprocess'); const net = require('net'); const LoggingEvent = require('../LoggingEvent'); const END_MSG = '__LOG4JS__'; /** * Creates a server, listening on config.loggerPort, config.loggerHost. * Output goes to config.actualAppender (config.appender is used to * set up that appender). */ function logServer(config, actualAppender, levels) { /** * Takes a utf-8 string, returns an object with * the correct log properties. */ function deserializeLoggingEvent(clientSocket, msg) { debug('(master) deserialising log event'); const loggingEvent = LoggingEvent.deserialise(msg); loggingEvent.remoteAddress = clientSocket.remoteAddress; loggingEvent.remotePort = clientSocket.remotePort; return loggingEvent; } /* eslint prefer-arrow-callback:0 */ const server = net.createServer(function connectionHandler(clientSocket) { debug('(master) connection received'); clientSocket.setEncoding('utf8'); let logMessage = ''; function logTheMessage(msg) { if (logMessage.length > 0) { debug('(master) deserialising log event and sending to actual appender'); actualAppender(deserializeLoggingEvent(clientSocket, msg)); } } function chunkReceived(chunk) { debug('(master) chunk of data received'); let event; logMessage += chunk || ''; if (logMessage.indexOf(END_MSG) > -1) { event = logMessage.substring(0, logMessage.indexOf(END_MSG)); logTheMessage(event); logMessage = logMessage.substring(event.length + END_MSG.length) || ''; // check for more, maybe it was a big chunk chunkReceived(); } } function handleError(error) { const loggingEvent = { startTime: new Date(), categoryName: 'log4js', level: levels.ERROR, data: ['A worker log process hung up unexpectedly', error], remoteAddress: clientSocket.remoteAddress, remotePort: clientSocket.remotePort }; actualAppender(loggingEvent); } clientSocket.on('data', chunkReceived); clientSocket.on('end', chunkReceived); clientSocket.on('error', handleError); }); server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function (e) { debug('(master) master server listening, error was ', e); // allow the process to exit, if this is the only socket active server.unref(); }); function app(event) { debug('(master) log event sent directly to actual appender (local event)'); return actualAppender(event); } app.shutdown = function (cb) { debug('(master) master shutdown called, closing server'); server.close(cb); }; return app; } function workerAppender(config) { let canWrite = false; const buffer = []; let socket; let shutdownAttempts = 3; function write(loggingEvent) { debug('(worker) Writing log event to socket'); socket.write(loggingEvent.serialise(), 'utf8'); socket.write(END_MSG, 'utf8'); } function emptyBuffer() { let evt; debug('(worker) emptying worker buffer'); /* eslint no-cond-assign:0 */ while ((evt = buffer.shift())) { write(evt); } } function createSocket() { debug( `(worker) worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}` ); socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost'); socket.on('connect', () => { debug('(worker) worker socket connected'); emptyBuffer(); canWrite = true; }); socket.on('timeout', socket.end.bind(socket)); // don't bother listening for 'error', 'close' gets called after that anyway socket.on('close', createSocket); } createSocket(); function log(loggingEvent) { if (canWrite) { write(loggingEvent); } else { debug('(worker) worker buffering log event because it cannot write at the moment'); buffer.push(loggingEvent); } } log.shutdown = function (cb) { debug('(worker) worker shutdown called'); if (buffer.length && shutdownAttempts) { debug('(worker) worker buffer has items, waiting 100ms to empty'); shutdownAttempts -= 1; setTimeout(() => { log.shutdown(cb); }, 100); } else { socket.removeAllListeners('close'); socket.end(cb); } }; return log; } function createAppender(config, appender, levels) { if (config.mode === 'master') { debug('Creating master appender'); return logServer(config, appender, levels); } debug('Creating worker appender'); return workerAppender(config); } function configure(config, layouts, findAppender, levels) { let appender; debug(`configure with mode = ${config.mode}`); if (config.mode === 'master') { if (!config.appender) { debug(`no appender found in config ${config}`); throw new Error('multiprocess master must have an "appender" defined'); } debug(`actual appender is ${config.appender}`); appender = findAppender(config.appender); if (!appender) { debug(`actual appender "${config.appender}" not found`); throw new Error(`multiprocess master appender "${config.appender}" not defined`); } } return createAppender(config, appender, levels); } module.exports.configure = configure;