diff --git a/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js index 393ea31e0d63dd3736d9669021d866cd4316d1dc..1748ac0fbfbb4815c9de207616c8a5767e208fdb 100644 --- a/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js +++ b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js @@ -8,149 +8,221 @@ "use strict"; const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw'); +const cookieParser = require('cookie-parser') +const express = require('express'); +const session = require('express-session') +const wsModule = require('../websocket'); +const http = require('http'); +const fs = require('fs'); +const MediaController = require('../media-controller'); var Screenshare = require('./screenshare'); var C = require('../bbb/messages/Constants'); // Global variables module.exports = class ScreenshareManager { - constructor (logger) { + + constructor (settings, logger) { this._logger = logger; this._clientId = 0; + this._app = express(); this._sessions = {}; this._screenshareSessions = {}; - this._bbbGW = new BigBlueButtonGW("MANAGER"); - this._redisGateway; + this._setupExpressSession(); + this._setupHttpServer(); } - async start() { - let self = this; + _setupExpressSession() { + this._app.use(cookieParser()); - try { - this._redisGateway = await this._bbbGW.addSubscribeChannel(C.TO_SCREENSHARE); - const transcode = await this._bbbGW.addSubscribeChannel(C.FROM_BBB_TRANSCODE_SYSTEM_CHAN); - this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this)); - process.on('message', this._onMessage.bind(this)); - console.log(' [ScreenshareManager] Successfully subscribed to redis channel'); - } - catch (error) { - console.log(' [ScreenshareManager] Could not connect to transcoder redis channel, finishing app...'); - console.log(error); - self._stopAll(); - } + this._sessionHandler = session({ + secret : 'Shawarma', rolling : true, resave : true, saveUninitialized : true + }); + + this._app.use(this._sessionHandler); } - _onMessage(_message) { - console.log(' [ScreenshareManager] Received message => '); + _setupHttpServer() { let self = this; - let session; - let message = _message; + /* + * Server startup + */ + this._httpServer = http.createServer(this._app).listen(3008, function() { + console.log(' [*] Running node-apps connection manager.'); + }); + + /* + * Management of sessions + */ + this._wss = new wsModule.Server({ + server : this._httpServer, + path : '/kurento-screenshare' + }); + + + // TODO isolate this + this._bbbGW = new BigBlueButtonGW(); + + this._bbbGW.addSubscribeChannel(C.FROM_BBB_TRANSCODE_SYSTEM_CHAN, function(error, redisWrapper) { + if(error) { + console.log(' Could not connect to transcoder redis channel, finishing app...'); + self._stopAll(); + } + console.log(' [server] Successfully subscribed to redis channel'); + }); + + + this._wss.on('connection', self._onNewConnection.bind(self)); + } - // The sessionId is voiceBridge for screensharing sessions - let sessionId = message.voiceBridge; - if(this._screenshareSessions[sessionId]) { - session = this._screenshareSessions[sessionId]; - } + _onNewConnection(webSocket) { + let self = this; + let connectionId; + let request = webSocket.upgradeReq; + let sessionId; + let callerName; + let response = { + writeHead : {} + }; + + this._sessionHandler(request, response, function(err) { + connectionId = request.session.id + "_" + self._clientId++; + console.log('Connection received with connectionId ' + connectionId); + }); + + webSocket.on('error', function(error) { + console.log('Connection ' + connectionId + ' error'); + self._stopSession(sessionId); + }); + + webSocket.on('close', function() { + console.log('Connection ' + connectionId + ' closed'); + console.log(webSocket.presenter); + + if (webSocket.presenter && self._screenshareSessions[sessionId]) { // if presenter // FIXME (this conditional was added to prevent screenshare stop when an iOS user quits) + console.log(" [CM] Stopping presenter " + sessionId); + self._stopSession(sessionId); + } + if (webSocket.viewer && typeof webSocket.session !== 'undefined') { + console.log(" [CM] Stopping viewer " + webSocket.viewerId); + webSocket.session._stopViewer(webSocket.viewerId); + } + }); + + webSocket.on('message', function(_message) { + let message = JSON.parse(_message); + let session; + // The sessionId is voiceBridge for screensharing sessions + sessionId = message.voiceBridge; + if(self._screenshareSessions[sessionId]) { + session = self._screenshareSessions[sessionId]; + webSocket.session = session; + } + + switch (message.id) { + + case 'presenter': + + // Checking if there's already a Screenshare session started + // because we shouldn't overwrite it + webSocket.presenter = true; + + if (!self._screenshareSessions[message.voiceBridge]) { + self._screenshareSessions[message.voiceBridge] = {} + self._screenshareSessions[message.voiceBridge] = session; + } - switch (message.id) { + //session.on('message', self._assembleSessionMessage.bind(self)); + if(session) { + break; + } - case 'presenter': + session = new Screenshare(webSocket, connectionId, self._bbbGW, + sessionId, message.callerName, message.vh, message.vw, + message.internalMeetingId); + + self._screenshareSessions[sessionId] = {} + self._screenshareSessions[sessionId] = session; + + // starts presenter by sending sessionID, websocket and sdpoffer + session._startPresenter(connectionId, webSocket, message.sdpOffer, function(error, sdpAnswer) { + console.log(" Started presenter " + connectionId); + if (error) { + return webSocket.send(JSON.stringify({ + id : 'presenterResponse', + response : 'rejected', + message : error + })); + } + + webSocket.send(JSON.stringify({ + id : 'presenterResponse', + response : 'accepted', + sdpAnswer : sdpAnswer + })); + console.log(" [websocket] Sending presenterResponse \n" + sdpAnswer); + }); + break; - // Checking if there's already a Screenshare session started - // because we shouldn't overwrite it + case 'viewer': + console.log("[viewer] Session output \n " + session); - if (!self._screenshareSessions[message.voiceBridge]) { - self._screenshareSessions[message.voiceBridge] = {} - self._screenshareSessions[message.voiceBridge] = session; - } + webSocket.viewer = true; + webSocket.viewerId = message.callerName; - if(session) { + if (message.sdpOffer && message.voiceBridge) { + if (session) { + session._startViewer(webSocket, message.voiceBridge, message.sdpOffer, message.callerName, self._screenshareSessions[message.voiceBridge]._presenterEndpoint); + } else { + webSocket.sendMessage("voiceBridge not recognized"); + webSocket.sendMessage(Object.keys(self._screenshareSessions)); + webSocket.sendMessage(message.voiceBridge); + } + } break; - } - session = new Screenshare(sessionId, self._bbbGW, - sessionId, message.callerName, message.vh, message.vw, - message.internalMeetingId); + case 'stop': + console.log('[' + message.id + '] connection ' + connectionId); - self._screenshareSessions[sessionId] = {} - self._screenshareSessions[sessionId] = session; + if (session) { + session._stop(sessionId); + } else { + console.log(" [stop] Why is there no session on STOP?"); + } + break; - // starts presenter by sending sessionID, websocket and sdpoffer - session._startPresenter(sessionId, message.sdpOffer, function(error, sdpAnswer) { - console.log(" [ScreenshareManager] Started presenter " + sessionId); - if (error) { - self._bbbGW.publish(JSON.stringify({ - id : 'presenterResponse', - response : 'rejected', - message : error - }), C.FROM_SCREENSHARE); - return error; + case 'onIceCandidate': + if (session) { + console.log(" [CM] What the fluff is happening"); + session._onIceCandidate(message.candidate); + } else { + console.log(" [iceCandidate] Why is there no session on ICE CANDIDATE?"); } + break; - self._bbbGW.publish(JSON.stringify({ - id : 'presenterResponse', - response : 'accepted', - sdpAnswer : sdpAnswer - }), C.FROM_SCREENSHARE); + case 'ping': + webSocket.send(JSON.stringify({ + id : 'pong', + response : 'accepted' + })); + break; - console.log(" [ScreenshareManager] [websocket] Sending presenterResponse \n" + sdpAnswer); - }); - break; - case 'viewer': - console.log(" [ScreenshareManager][viewer] Session output \n " + session); - if (message.sdpOffer && message.voiceBridge) { + case 'viewerIceCandidate': + console.log("[viewerIceCandidate] Session output => " + session); if (session) { - session._startViewer(message.voiceBridge, message.sdpOffer, message.callerName, self._screenshareSessions[message.voiceBridge]._presenterEndpoint); + session._onViewerIceCandidate(message.candidate, message.callerName); } else { - // TODO ERROR HANDLING + console.log("[iceCandidate] Why is there no session on ICE CANDIDATE?"); } - } - break; - - case 'stop': - console.log('[' + message.id + '] connection ' + sessionId); - - if (session) { - session._stop(sessionId); - } else { - console.log(" [stop] Why is there no session on STOP?"); - } - break; - - case 'onIceCandidate': - if (session) { - session._onIceCandidate(message.candidate); - } else { - console.log(" [iceCandidate] Why is there no session on ICE CANDIDATE?"); - } - break; - - case 'ping': - self._bbbGW.publish(JSON.stringify({ - id : 'pong', - response : 'accepted' - }), C.FROM_SCREENSHARE); - break; - - - case 'viewerIceCandidate': - console.log("[viewerIceCandidate] Session output => " + session); - if (session) { - session._onViewerIceCandidate(message.candidate, message.callerName); - } else { - console.log("[iceCandidate] Why is there no session on ICE CANDIDATE?"); - } - break; - - default: - self._bbbGW.publish(JSON.stringify({ - id : 'error', - message: 'Invald message ' + message - }), C.FROM_SCREENSHARE); - break; - } + break; + + default: + webSocket.sendMessage({ id : 'error', message : 'Invalid message ' + message }); + break; + } + }); } _stopSession(sessionId) { diff --git a/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js b/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js index b2fb752838257ab48e17ce3f7794d144c51a718a..e8755e9024cb276cb52ecc0eacdae81e711f094c 100644 --- a/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js +++ b/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js @@ -10,4 +10,3 @@ process.on('disconnect',function() { }); c = new ScreenshareManager(); -c.start(); diff --git a/labs/node-bbb-apps/lib/screenshare/screenshare.js b/labs/node-bbb-apps/lib/screenshare/screenshare.js index 66ec882b31c8bb345efee2d76f1f407f60fbad5c..75cfccf742da95673e571168d7fba305d29669ba 100644 --- a/labs/node-bbb-apps/lib/screenshare/screenshare.js +++ b/labs/node-bbb-apps/lib/screenshare/screenshare.js @@ -31,7 +31,8 @@ if (config.get('acceptSelfSignedCertificate')) { } module.exports = class Screenshare { - constructor(id, bbbgw, voiceBridge, caller, vh, vw, meetingId) { + constructor(ws, id, bbbgw, voiceBridge, caller, vh, vw, meetingId) { + this._ws = ws; this._id = id; this._BigBlueButtonGW = bbbgw; this._presenterEndpoint = null; @@ -52,17 +53,18 @@ module.exports = class Screenshare { let candidate = kurento.getComplexType('IceCandidate')(_candidate); if (this._presenterEndpoint) { + console.log(" [screenshare] Adding ICE candidate to presenter"); this._presenterEndpoint.addIceCandidate(candidate); } else { this._candidatesQueue.push(candidate); } }; - + _onViewerIceCandidate(_candidate, callerName) { console.log("onviewericecandidate callerName = " + callerName); let candidate = kurento.getComplexType('IceCandidate')(_candidate); - + if (this._viewersEndpoint[callerName]) { this._viewersEndpoint[callerName].addIceCandidate(candidate); } @@ -74,14 +76,14 @@ module.exports = class Screenshare { } } - _startViewer(voiceBridge, sdp, callerName, presenterEndpoint, callback) { + _startViewer(ws, voiceBridge, sdp, callerName, presenterEndpoint, callback) { let self = this; let _callback = function(){}; console.log("startviewer callerName = " + callerName); self._viewersCandidatesQueue[callerName] = []; - + console.log("VIEWER VOICEBRIDGE: "+self._voiceBridge); - + MediaController.createMediaElement(voiceBridge, C.WebRTC, function(error, webRtcEndpoint) { if (error) { console.log("Media elements error" + error); @@ -106,11 +108,7 @@ module.exports = class Screenshare { // ICE NEGOTIATION WITH THE ENDPOINT self._viewersEndpoint[callerName].on('OnIceCandidate', function(event) { - let candidate = kurento.getComplexType('IceCandidate')(event.candidate); - self._BigBlueButtonGW.publish(JSON.stringify({ - id : 'iceCandidate', - candidate : candidate - }), C.FROM_SCREENSHARE); + let candidate = kurento.getComplexType('IceCandidate')(event.candidate); ws.sendMessage({ id : 'iceCandidate', candidate : candidate }); }); sdp = h264_sdp.transform(sdp); @@ -121,11 +119,7 @@ module.exports = class Screenshare { //pipeline.release(); return _callback(error); } - self._BigBlueButtonGW.publish(JSON.stringify({ - id: "viewerResponse", - sdpAnswer: webRtcSdpAnswer, - response: "accepted" - }), C.FROM_SCREENSHARE); + ws.sendMessage({id: "viewerResponse", sdpAnswer: webRtcSdpAnswer, response: "accepted"}); console.log(" Sent sdp message to client with callerName:" + callerName); MediaController.gatherCandidates(webRtcEndpoint.id, function(error) { @@ -134,12 +128,12 @@ module.exports = class Screenshare { } self._viewersEndpoint[callerName].on('MediaFlowInStateChange', function(event) { - if (event.state === 'NOT_FLOWING') { - console.log(" NOT FLOWING "); - } - else if (event.state === 'FLOWING') { - console.log(" FLOWING "); - } + if (event.state === 'NOT_FLOWING') { + console.log(" NOT FLOWING "); + } + else if (event.state === 'FLOWING') { + console.log(" FLOWING "); + } }); }); }); @@ -147,7 +141,7 @@ module.exports = class Screenshare { } - _startPresenter(id, sdpOffer, callback) { + _startPresenter(id, ws, sdpOffer, callback) { let self = this; let _callback = callback; @@ -189,11 +183,7 @@ module.exports = class Screenshare { self._presenterEndpoint.on('OnIceCandidate', function(event) { let candidate = kurento.getComplexType('IceCandidate')(event.candidate); - self._BigBlueButtonGW.publish(JSON.stringify({ - id : 'iceCandidate', - cameraId: id, - candidate : candidate - }), C.FROM_SCREENSHARE); + ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate }); }); MediaController.processOffer(webRtcEndpoint.id, sdpOffer, function(error, webRtcSdpAnswer) { @@ -254,7 +244,6 @@ module.exports = class Screenshare { } else { console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); } - if (this._ffmpegRtpEndpoint) { MediaController.releaseMediaElement(this._ffmpegRtpEndpoint.id); this._ffmpegRtpEndpoint = null; @@ -273,7 +262,7 @@ module.exports = class Screenshare { let self = this; let strm = Messaging.generateStopTranscoderRequestMessage(this._meetingId, this._meetingId); - self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN); + self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {}); // Interoperability: capturing 1.1 stop_transcoder_reply messages self._BigBlueButtonGW.once(C.STOP_TRANSCODER_REPLY, function(payload) { @@ -290,6 +279,7 @@ module.exports = class Screenshare { } _onRtpMediaFlowing(meetingId, rtpParams) { + console.log(" [screenshare] Media FLOWING for meeting => " + meetingId); let self = this; let strm = Messaging.generateStartTranscoderRequestMessage(meetingId, meetingId, rtpParams); @@ -308,21 +298,23 @@ module.exports = class Screenshare { }); - self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN); + self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN, function(error) {}); }; _stopRtmpBroadcast (meetingId) { - var self = this; + console.log(" [screenshare] _stopRtmpBroadcast for meeting => " + meetingId); + let self = this; if(self._meetingId === meetingId) { // TODO correctly assemble this timestamp let timestamp = now.format('hhmmss'); let dsrstom = Messaging.generateScreenshareRTMPBroadcastStoppedEvent2x(self._voiceBridge, self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp); - self._BigBlueButtonGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN); + self._BigBlueButtonGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {}); } } _startRtmpBroadcast (meetingId, output) { + console.log(" [screenshare] _startRtmpBroadcast for meeting => " + meetingId); var self = this; if(self._meetingId === meetingId) { // TODO correctly assemble this timestamp @@ -331,11 +323,25 @@ module.exports = class Screenshare { let dsrbstam = Messaging.generateScreenshareRTMPBroadcastStartedEvent2x(self._voiceBridge, self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp); - self._BigBlueButtonGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN); + self._BigBlueButtonGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN, function(error) {}); } } _onRtpMediaNotFlowing() { console.log(" [screenshare] TODO RTP NOT_FLOWING"); }; + + _stopViewer(id) { + let viewer = this._viewersEndpoint[id]; + console.log(' [stop] Releasing endpoints for ' + id); + + if (viewer) { + MediaController.releaseMediaElement(viewer.id); + this._viewersEndpoint[viewer.id] = null; + } else { + console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); + } + + delete this._viewersCandidatesQueue[id]; + }; }; diff --git a/labs/node-bbb-apps/server.js b/labs/node-bbb-apps/server.js index a0b2e12da65cde955b7b257a7a2e115e4b63ce98..94b4ec868b85e7a6608680639cd620d4ce3f8dd9 100755 --- a/labs/node-bbb-apps/server.js +++ b/labs/node-bbb-apps/server.js @@ -7,8 +7,8 @@ const ConnectionManager = require('./lib/connection-manager/ConnectionManager'); const HttpServer = require('./lib/connection-manager/HttpServer'); -const server = new HttpServer(); -const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager'); +//const server = new HttpServer(); +//const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager'); const cp = require('child_process'); let screenshareProc = cp.fork('./lib/screenshare/ScreenshareProcess', { @@ -47,9 +47,9 @@ videoProc.on('message',onMessage); videoProc.on('error',onError); videoProc.on('disconnect',onDisconnect); -const CM = new ConnectionManager(screenshareProc, videoProc); +//const CM = new ConnectionManager(screenshareProc, videoProc); -let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/kurento-screenshare'); +//let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/kurento-screenshare'); process.on('SIGTERM', process.exit) process.on('SIGINT', process.exit) @@ -59,9 +59,9 @@ process.on('uncaughtException', function (error) { }); -CM.setHttpServer(server); -CM.addAdapter(websocketManager); - -CM.listen(() => { - console.log(" [SERVER] Server started"); -}); +//CM.setHttpServer(server); +//CM.addAdapter(websocketManager); +// +//CM.listen(() => { +// console.log(" [SERVER] Server started"); +//});