diff --git a/labs/bbb-webrtc-sfu/lib/ProcessManager.js b/labs/bbb-webrtc-sfu/lib/ProcessManager.js new file mode 100644 index 0000000000000000000000000000000000000000..d46d6fc4b36ff1dc58e348aa58c906f5cfb0c5ad --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/ProcessManager.js @@ -0,0 +1,118 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +'use strict'; + +const cp = require('child_process'); +const Logger = require('./utils/Logger'); +const SCREENSHARE_PATH = './lib/screenshare/ScreenshareProcess'; +const VIDEO_PATH = './lib/video/VideoProcess.js'; + +module.exports = class ProcessManager { + constructor() { + this.screensharePid; + this.videoPid; + this.screenshareProcess; + this.videoProcess; + this.processes = {}; + this.runningState = "RUNNING"; + } + + async start () { + let screenshareProcess = this.startProcess(SCREENSHARE_PATH); + + let videoProcess = this.startProcess(VIDEO_PATH); + + this.processes[screenshareProcess.pid] = screenshareProcess; + this.processes[videoProcess.pid] = videoProcess; + + process.on('SIGTERM', async () => { + await this.finishChildProcesses(); + process.exit(0); + }); + + process.on('SIGINT', async () => { + await this.finishChildProcesses(); + process.exit(0); + }); + + process.on('uncaughtException', async (error) => { + Logger.error('[ProcessManager] Uncaught exception', error.stack); + await this.finishChildProcesses(); + process.exit('1'); + }); + + // Added this listener to identify unhandled promises, but we should start making + // sense of those as we find them + process.on('unhandledRejection', (reason, p) => { + Logger.error('[ProcessManager] Unhandled Rejection at: Promise', p, 'reason:', reason); + }); + } + + startProcess (processPath) { + Logger.info("[ProcessManager] Starting process at path", processPath); + let proc = cp.fork(processPath, { + // Pass over all of the environment. + env: process.ENV, + // Share stdout/stderr, so we can hear the inevitable errors. + silent: false + }); + + proc.path = processPath; + + proc.on('message', this.onMessage); + proc.on('error', this.onError); + + proc.on('disconnect', (error) => { + Logger.info('[ProcessManager] Received disconnect event from child process with PID', this.pid, ', killing it'); + let processId = proc.pid; + + proc.kill(); + if (this.runningState === 'RUNNING') { + Logger.info('[ProcessManager] Restarting process', processId, 'because server is still running...'); + this.restartProcess(processId); + } + }); + + return proc; + } + + restartProcess (pid) { + let proc = this.processes[pid]; + if (proc) { + let newProcess = this.startProcess(proc.path); + this.processes[newProcess.pid] = newProcess; + delete this.processes[pid]; + } + } + + onMessage (message) { + Logger.info('[ProcessManager] Received child message from', this.pid, message); + } + + onError (e) { + Logger.error('[ProcessManager] Received child error', this.pid, e); + } + + onDisconnect (e) { + } + + async finishChildProcesses () { + this.runningState = "STOPPING"; + + for (var proc in this.processes) { + if (this.processes.hasOwnProperty(proc)) { + let procObj = this.processes[proc]; + if (procObj.kill === 'function' && !procObj.killed) { + await procObj.disconnect() + } + } + } + + this.runningState = "STOPPED"; + } +} diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js index d57795f299e00863de3763b033d9e3bc2b0818f6..3dc2693519ddb67ff00edd3453b81e812476356d 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js @@ -49,6 +49,11 @@ exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange" exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream" exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate" +// Error codes +exports.ERROR = {}; +exports.ERROR.MEDIA_SERVER_OFFLINE = "1000"; +exports.ERROR.MEDIA_SERVER_ERROR = "1001"; + // RTP params exports.SDP = {}; exports.SDP.PARAMS = "params" diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js index 776dac4d96bad2a5b7b95333035ac4b865d07eb1..d70a7bbffed5b0663313873633af893b5d268f19 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js @@ -27,13 +27,22 @@ module.exports = class MediaServer extends EventEmitter { async init () { if (typeof this._mediaServer === 'undefined' || !this._mediaServer) { this._mediaServer = await this._getMediaServerClient(this._serverUri); + Logger.info("[mcs-media] Retrieved media server client => " + this._mediaServer); + + this._mediaServer.on('disconnect', (err) => { + Logger.error('[mcs-media] Media server was disconnected for some reason, will have to clean up all elements and notify users'); + this._destroyElements(); + this._destroyMediaServer(); + this.emit(C.ERROR.MEDIA_SERVER_OFFLINE); + }); } } _getMediaServerClient (serverUri) { return new Promise((resolve, reject) => { - mediaServerClient(serverUri, (error, client) => { + mediaServerClient(serverUri, {failAfter: 3}, (error, client) => { if (error) { + error = this._handleError(error); reject(error); } resolve(client); @@ -41,30 +50,32 @@ module.exports = class MediaServer extends EventEmitter { }); } - _getMediaPipeline (conference) { + _getMediaPipeline (roomId) { return new Promise((resolve, reject) => { - if (this._mediaPipelines[conference]) { - Logger.warn('[mcs-media] Pipeline for', conference, ' already exists.'); - resolve(this._mediaPipelines[conference]); + if (this._mediaPipelines[roomId]) { + Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.'); + resolve(this._mediaPipelines[roomId]); } else { this._mediaServer.create('MediaPipeline', (error, pipeline) => { if (error) { - Logger.error('[mcs-media] Create MediaPipeline returned error', error); + error = this._handleError(error); reject(error); } - this._mediaPipelines[conference] = pipeline; + this._mediaPipelines[roomId] = pipeline; + pipeline.activeElements = 0; resolve(pipeline); }); } }); } - _releasePipeline (pipelineId) { - let mediaPipeline = this._mediaPipelines[pipelineId]; - - if (typeof mediaPipeline !== 'undefined' && typeof mediaPipeline.release === 'function') { - mediaElement.release(); + _releasePipeline (room) { + Logger.debug("[mcs-media] Releasing room", room, "pipeline"); + let pipeline = this._mediaPipelines[room]; + if (pipeline && typeof pipeline.release === 'function') { + pipeline.release(); + delete this._mediaPipelines[room]; } } @@ -72,6 +83,7 @@ module.exports = class MediaServer extends EventEmitter { return new Promise((resolve, reject) => { pipeline.create(type, (error, mediaElement) => { if (error) { + error = this._handleError(error); return reject(error); } Logger.info("[mcs-media] Created [" + type + "] media element: " + mediaElement.id); @@ -82,14 +94,15 @@ module.exports = class MediaServer extends EventEmitter { } - async createMediaElement (conference, type) { + async createMediaElement (roomId, type) { try { - const pipeline = await this._getMediaPipeline(conference); + const pipeline = await this._getMediaPipeline(roomId); const mediaElement = await this._createElement(pipeline, type); + this._mediaPipelines[roomId].activeElements++; return Promise.resolve(mediaElement.id); } catch (err) { - return Promise.reject(new Error(err)); + return Promise.reject(err); } } @@ -103,6 +116,7 @@ module.exports = class MediaServer extends EventEmitter { case 'ALL': source.connect(sink, (error) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(); @@ -111,9 +125,18 @@ module.exports = class MediaServer extends EventEmitter { case 'AUDIO': + source.connect(sink, 'AUDIO', (error) => { + if (error) { + error = this._handleError(error); + return reject(error); + } + return resolve(); + }); + case 'VIDEO': source.connect(sink, (error) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(); @@ -129,13 +152,24 @@ module.exports = class MediaServer extends EventEmitter { } } - stop (elementId) { + stop (room, elementId) { + Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room); let mediaElement = this._mediaElements[elementId]; - if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { - Logger.info("[mcs-media] Releasing endpoint => " + elementId); + let pipeline = this._mediaPipelines[room]; + if (mediaElement && typeof mediaElement.release === 'function') { + pipeline.activeElements--; + + Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements"); + if (pipeline.activeElements <= 0) { + this._releasePipeline(room); + } + mediaElement.release(); this._mediaElements[elementId] = null; } + else { + Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop"); + } } @@ -150,7 +184,7 @@ module.exports = class MediaServer extends EventEmitter { return Promise.resolve(); } else { - return Promise.reject(new Error("Candidate could not be parsed or media element does not exist")); + return Promise.reject("Candidate could not be parsed or media element does not exist"); } } @@ -162,7 +196,8 @@ module.exports = class MediaServer extends EventEmitter { if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') { mediaElement.gatherCandidates((error) => { if (error) { - return reject(new Error(error)); + error = this._handleError(error); + return reject(error); } Logger.info('[mcs-media] Triggered ICE gathering for ' + elementId); return resolve(); @@ -214,6 +249,7 @@ module.exports = class MediaServer extends EventEmitter { if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') { mediaElement.processOffer(sdpOffer, (error, answer) => { if (error) { + error = this._handleError(error); return reject(error); } return resolve(answer); @@ -269,4 +305,38 @@ module.exports = class MediaServer extends EventEmitter { notifyMediaState (elementId, eventTag, event) { this.emit(C.MEDIA_STATE.MEDIA_EVENT , {elementId, eventTag, event}); } + + _destroyElements() { + for (var pipeline in this._mediaPipelines) { + if (this._mediaPipelines.hasOwnProperty(pipeline)) { + delete this._mediaPipelines[pipeline]; + } + } + + for (var element in this._mediaElements) { + if (this._mediaElements.hasOwnProperty(element)) { + delete this._mediaElements[element]; + } + } + } + + _destroyMediaServer() { + delete this._mediaServer; + } + + _handleError(error) { + // Checking if the error needs to be wrapped into a JS Error instance + if(!isError(error)) { + error = new Error(error); + } + + error.code = C.ERROR.MEDIA_SERVER_ERROR; + Logger.error('[mcs-media] Media Server returned error', error); + } + + // duck + isError(error) { + return error && error.stack && error.message && typeof error.stack === 'string' + && typeof error.message === 'string'; + } }; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js index 1da857d04777447c86256efab1da03fdda3f16ef..00b6d5ff37685778720ca08f888919f706a736ae 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js @@ -70,7 +70,7 @@ module.exports = class SdpSession extends EventEmitter { async stop () { this._status = C.STATUS.STOPPING; try { - await this._MediaServer.stop(this._mediaElement); + await this._MediaServer.stop(this.room, this._mediaElement); this._status = C.STATUS.STOPPED; Logger.info("[mcs-sdp-session] Session ", this.id, " is going to stop..."); this.emit('SESSION_STOPPED', this.id); diff --git a/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareProcess.js b/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareProcess.js index a53152f4f7142f14af315c7c6a4aeb7bff57fed6..cf5929aef14ec7b3f8a148e494e6438d016d753f 100644 --- a/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareProcess.js +++ b/labs/bbb-webrtc-sfu/lib/screenshare/ScreenshareProcess.js @@ -1,7 +1,10 @@ +'use strict'; + const ScreenshareManager = require('./ScreenshareManager'); const Logger = require('../utils/Logger'); const config = require('config'); + if (config.get('acceptSelfSignedCertificate')) { process.env.NODE_TLS_REJECT_UNAUTHORIZED=0; } @@ -12,7 +15,11 @@ process.on('uncaughtException', (error) => { Logger.error('[ScreenshareProcess] Uncaught exception ', error.stack); }); -process.on('disconnect', c.stopAll); +process.on('disconnect', async () => { + Logger.warn('[ScreenshareProcess] Parent process exited, cleaning things up and finishing child now...'); + await c.stopAll(); + process.exit(0); +}); // Added this listener to identify unhandled promises, but we should start making // sense of those as we find them diff --git a/labs/bbb-webrtc-sfu/lib/video/VideoManager.js b/labs/bbb-webrtc-sfu/lib/video/VideoManager.js index ef4eadf257ebed94efd14392f4f39c46a09bd8b0..718593aa466201a583eb633ef953f867023276d8 100755 --- a/labs/bbb-webrtc-sfu/lib/video/VideoManager.js +++ b/labs/bbb-webrtc-sfu/lib/video/VideoManager.js @@ -209,6 +209,3 @@ let logAvailableSessions = function() { } } - -process.on('SIGTERM', stopAll); -process.on('SIGINT', stopAll); diff --git a/labs/bbb-webrtc-sfu/lib/video/VideoProcess.js b/labs/bbb-webrtc-sfu/lib/video/VideoProcess.js index 1e06fea87527f0fcedd0040df0c4508c9399ab73..116bcf16ab6044f2239bbda27cbcc842db6a72fb 100644 --- a/labs/bbb-webrtc-sfu/lib/video/VideoProcess.js +++ b/labs/bbb-webrtc-sfu/lib/video/VideoProcess.js @@ -1,3 +1,5 @@ +'use strict'; + const Logger = require('../utils/Logger'); const config = require('config'); @@ -13,8 +15,11 @@ process.on('uncaughtException', (error) => { Logger.error('[VideoProcess] Uncaught exception ', error.stack); }); -process.on('disconnect', () => { - Logger.info("[VideoProcess] Parent process disconnected!"); +process.on('disconnect', async () => { + Logger.warn('[VideoProcess] Parent process exited, cleaning things up and finishing child now...'); + //TODO below + //async VideoManager.stopAll(); + process.exit(0); }); // Added this listener to identify unhandled promises, but we should start making diff --git a/labs/bbb-webrtc-sfu/server.js b/labs/bbb-webrtc-sfu/server.js index 27c1b10e1c056a162b089c62fc79981f527d848b..0e04fad4c4dd03f72d06bd2864b229fb76ab1a20 100755 --- a/labs/bbb-webrtc-sfu/server.js +++ b/labs/bbb-webrtc-sfu/server.js @@ -5,64 +5,19 @@ * */ +'use strict'; + const ConnectionManager = require('./lib/connection-manager/ConnectionManager'); const HttpServer = require('./lib/connection-manager/HttpServer'); const server = new HttpServer(); const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager'); -const cp = require('child_process'); const Logger = require('./lib/utils/Logger'); +const ProcessManager = require('./lib/ProcessManager.js'); +const PM = new ProcessManager(); -let screenshareProc = cp.fork('./lib/screenshare/ScreenshareProcess', { - // Pass over all of the environment. - env: process.ENV, - // Share stdout/stderr, so we can hear the inevitable errors. - silent: false -}); - -let videoProc = cp.fork('./lib/video/VideoProcess.js', { - // Pass over all of the environment. - env: process.ENV, - // Share stdout/stderr, so we can hear the inevitable errors. - silent: false -}); - -let onMessage = function (message) { - Logger.info('event','child message',this.pid,message); -}; - -let onError = function(e) { - Logger.error('event','child error',this.pid,e); -}; - -let onDisconnect = function(e) { - Logger.info(e); - Logger.info('event','child disconnect',this.pid,'killing...'); - this.kill(); -}; - -screenshareProc.on('message',onMessage); -screenshareProc.on('error',onError); -screenshareProc.on('disconnect',onDisconnect); - -videoProc.on('message',onMessage); -videoProc.on('error',onError); -videoProc.on('disconnect',onDisconnect); - -process.on('SIGTERM', process.exit) -process.on('SIGINT', process.exit) - -process.on('uncaughtException', (error) => { - Logger.error('[MainProcess] Uncaught exception', error.stack); - process.exit('1'); -}); - -// Added this listener to identify unhandled promises, but we should start making -// sense of those as we find them -process.on('unhandledRejection', (reason, p) => { - Logger.error('[MainProcess] Unhandled Rejection at: Promise', p, 'reason:', reason); -}); +PM.start(); -const CM = new ConnectionManager(screenshareProc, videoProc); +const CM = new ConnectionManager(PM.screenshareProcess, PM.videoProcess); let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/bbb-webrtc-sfu');