diff --git a/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js b/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js index e7052be0c5738367242f1f257ef75ccb8eec5475..dcfbdb3876eb7726d2637b1e770ee5ac448a394e 100644 --- a/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js +++ b/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js @@ -53,7 +53,7 @@ module.exports = class ConnectionManager { case "screenshare": self._bbbGW.publish(JSON.stringify(data), C.TO_SCREENSHARE); break; - + case "video": self._bbbGW.publish(JSON.stringify(data), C.TO_VIDEO); break; diff --git a/labs/node-bbb-apps/lib/mcs-core/CoreProcess.js b/labs/node-bbb-apps/lib/mcs-core/CoreProcess.js new file mode 100644 index 0000000000000000000000000000000000000000..ee03958e42a0d3d54b74e7c76a2a1fef19234b7a --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/CoreProcess.js @@ -0,0 +1,12 @@ +const MCSApiStub = require('./media/MCSApiStub'); + +process.on('uncaughtException', function (error) { + console.log(error.stack); +}); + +process.on('disconnect',function() { + console.log("Parent exited!"); + process.kill(); +}); + +core = new MCSApiStub(); diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js b/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js index 341fe4309ca6efbdc38526c6c9ebc3fb855226d1..94f7acc9dc9f89aa14094232af1bb300e47468e7 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js @@ -44,12 +44,14 @@ exports.EVENT.SESSION_ID_EVENT = "SESSION_ID" exports.EVENT.AUDIO_SESSION_TERMINATED = "AUDIO_SESSION_TERMINATED" // Media server state changes +exports.EVENT.NEW_SESSION = "NewSession" exports.EVENT.MEDIA_STATE = {}; exports.EVENT.MEDIA_STATE.MEDIA_EVENT = "MediaEvent" exports.EVENT.MEDIA_STATE.CHANGED = "MediaStateChanged" exports.EVENT.MEDIA_STATE.FLOW_OUT = "MediaFlowOutStateChange" exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange" exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream" +exports.EVENT.MEDIA_STATE.ICE = "OnIceCandidate" diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js index 1410b60982d3c3953430dcdec4422484fd691def..f0d690d62eb2ab19a6ae97214abe3a89a6616b5c 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js @@ -7,11 +7,18 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var MediaController = require('./MediaController.js'); +let instance = null; -module.exports = class MCSApiStub extends EventEmitter { +module.exports = class MCSApiStub extends EventEmitter{ constructor() { - super(); - this._mediaController = new MediaController(); + if(!instance) { + super(); + this.listener = new EventEmitter(); + this._mediaController = new MediaController(this.listener); + instance = this; + } + + return instance; } async join (room, type, params) { @@ -51,6 +58,12 @@ module.exports = class MCSApiStub extends EventEmitter { async publish (user, room, type, params) { try { + this.listener.once(C.EVENT.NEW_SESSION+user, (event) => { + let sessionId = event; + this.listener.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, (event) => { + this.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+sessionId, event); + }); + }); const answer = await this._mediaController.publish(user, room, type, params); return Promise.resolve(answer); } @@ -74,6 +87,10 @@ module.exports = class MCSApiStub extends EventEmitter { async subscribe (user, sourceId, type, params) { try { const answer = await this._mediaController.subscribe(user, sourceId, type, params); + this.listener.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+answer.sessionId, (event) => { + this.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+answer.sessionId, event); + }); + return Promise.resolve(answer); } catch (err) { @@ -89,10 +106,36 @@ module.exports = class MCSApiStub extends EventEmitter { } catch (err) { console.log(err); - Promise.reject(err); + return Promise.reject(err); } } + async onEvent (eventName, mediaId) { + try { + const eventTag = this._mediaController.onEvent(eventName, mediaId); + this._mediaController.on(eventTag, (event) => { + this.emit(eventTag, event); + }); + + return Promise.resolve(eventTag); + } + catch (err) { + console.log(err); + return Promise.reject(); + } + } + + async addIceCandidate (mediaId, candidate) { + try { + console.log(" [api] Adding ice candidate for => " + mediaId); + const ack = await this._mediaController.addIceCandidate(mediaId, candidate); + return Promise.resolve(ack); + } + catch (err) { + console.log(err); + Promise.reject(); + } + } setStrategy (strategy) { // TODO } diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js index 4afcacf1450529ac38a322bd128b1a263da331a1..de9b888cfef941ce1412f9c9c073f2bd1d25344c 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js @@ -1,13 +1,13 @@ 'use strict' -var config = require('config'); -var C = require('../constants/Constants'); +const config = require('config'); +const C = require('../constants/Constants'); // Model -var SfuUser = require('../model/SfuUser'); -var Room = require('../model/Room.js'); -var mediaServer = require('./media-server'); +const SfuUser = require('../model/SfuUser'); +const Room = require('../model/Room.js'); +const EventEmitter = require('events').EventEmitter; /* PRIVATE ELEMENTS */ /** @@ -29,11 +29,20 @@ function getVideoPort() { /* PUBLIC ELEMENTS */ +let instance = null; + + module.exports = class MediaController { - constructor() { - // Kurento - this._rooms = {}; - this._users = {}; + constructor(emitter) { + if (!instance) { + this.emitter = emitter; + this._rooms = {}; + this._users = {}; + this._mediaSessions = {}; + instance = this; + } + + return instance; } start (_kurentoClient, _kurentoToken, callback) { @@ -60,18 +69,20 @@ module.exports = class MediaController { } async join (roomId, type, params) { - console.log("[mcs] join"); + console.log("[mcs] Join room => " + roomId + ' as ' + type); try { + let session; const room = await this.createRoomMCS(roomId); const user = await this.createUserMCS(roomId, type, params); let userId = user.id; room.setUser(user); if (params.sdp) { - const sessionId = user.addSdp(params.sdp); + session = user.addSdp(params.sdp); } if (params.uri) { - const sessionId = user.addUri(params.sdp); + session = user.addUri(params.sdp); } + console.log("[mcs] Resolving user " + userId); return Promise.resolve(userId); } @@ -85,13 +96,22 @@ module.exports = class MediaController { console.log("[mcs] pns"); let type = params.type; try { - const user = await this.getUserMCS(userId); + user = this.getUserMCS(userId); let userId = user.id; - const sessionId = user.addSdp(sdp, type); - const answer = await user.startSession(sessionId); - await user.connect(sourceId, sessionId); - console.log("[mcs] user with sdp session " + sessionId); - console.log(user); + let session = user.addSdp(sdp, type); + let sessionId = session.id; + + if (typeof this._mediaSessions[session.id] == 'undefined' || + !this._mediaSessions[session.id]) { + this._mediaSessions[session.id] = {}; + } + + this._mediaSessions[session.id] = session; + + const answer = await user.startSession(session.id); + await user.connect(sourceId, session.id); + + console.log("[mcs] user with sdp session " + session.id); return Promise.resolve({userId, sessionId}); } catch (err) { @@ -102,68 +122,109 @@ module.exports = class MediaController { async publish (userId, roomId, type, params) { console.log("[mcs] publish"); - let mediaId; + let session; // TODO handle mediaType let mediaType = params.mediaType; let answer; try { + console.log(" [mcs] Fetching user => " + userId); + const user = await this.getUserMCS(userId); + console.log(" [mcs] Fetched user => " + user); + switch (type) { - case "SDP": - mediaId = user.addSdp(params.descriptor, type); - answer = await user.startSession(mediaId); + case "RtpEndpoint": + case "WebRtcEndpoint": + session = user.addSdp(params.descriptor, type); + + answer = await user.startSession(session.id); break; case "URI": - mediaId = user.addUri(params.descriptor, type); - answer = await user.startSession(mediaId); + session = user.addUri(params.descriptor, type); + + answer = await user.startSession(session.id); break; default: return Promise.reject(new Error("[mcs] Invalid media type")); } } catch (err) { - return Promise.reject(new Error(err)); + console.log(err); + return Promise.reject(err); + } + + if (typeof this._mediaSessions[session.id] == 'undefined' || + !this._mediaSessions[session.id]) { + this._mediaSessions[session.id] = {}; } - console.log(user); - return Promise.resolve({answer, mediaId}); + + this._mediaSessions[session.id] = session; + let sessionId = session.id; + + return Promise.resolve({answer, sessionId}); } async subscribe (userId, type, sourceId, params) { - let mediaId; + console.log(" [mcs] subscribe"); + let session; // TODO handle mediaType let mediaType = params.mediaType; + let answer; + let sourceSession = this._mediaSessions[sourceId]; + + if (typeof sourceSession === 'undefined') { + return Promise.reject(new Error(" [mcs] Media session " + sourceId + " was not found")); + } try { - const user = this.getUserMCS(userId); + console.log(" [mcs] Fetching user => " + userId); + + const user = await this.getUserMCS(userId); + + console.log(" [mcs] Fetched user => " + user); switch (type) { - case "SDP": - mediaId = user.addSdp(params.descriptor, type); - await user.connect(sourceId, mediaId); - const answer = await user.startSession(mediaId); + case "RtpEndpoint": + case "WebRtcEndpoint": + session = user.addSdp(params.descriptor, type); + + answer = await user.startSession(session.id); + await sourceSession.connect(session._mediaElement); + break; case "URI": - //TODO - //mediaId = user.addUri(params.descriptor); + session = user.addUri(params.descriptor, type); + answer = await user.startSession(session.id); + await sourceSession.connect(session._mediaElement); + break; default: return Promise.reject(new Error("[mcs] Invalid media type")); } } catch (err) { - return Promise.reject(new Error(err)); + console.log(err); + return Promise.reject(err); } - return Promise.resolve({answer, mediaId}); - } + if (typeof this._mediaSessions[session.id] == 'undefined' || + !this._mediaSessions[session.id]) { + this._mediaSessions[session.id] = {}; + } + this._mediaSessions[session.id] = session; + let sessionId = session.id; + + return Promise.resolve({answer, sessionId}); + } async unpublish (userId, mediaId) { try { const user = this.getUserMCS(userId); const answer = await user.unpublish(mediaId); + this._mediaSessions[mediaId] = null; return Promise.resolve(answer); } catch (err) { @@ -176,18 +237,35 @@ module.exports = class MediaController { const user = this.getUserMCS(userId); const answer = await user.unsubscribe(mediaId); return Promise.resolve(); + this._mediaSessions[mediaId] = null; } catch (err) { return Promise.reject(new Error(err)); } } + async addIceCandidate (mediaId, candidate) { + let session = this._mediaSessions[mediaId]; + if (typeof session === 'undefined') { + return Promise.reject(new Error(" [mcs] Media session " + mediaId + " was not found")); + } + try { + console.log(" [mcs] Adding ICE candidate for => " + mediaId); + const ack = await session.addIceCandidate(candidate); + return Promise.resolve(ack); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + /** * Creates an empty {Room} room and indexes it * @param {String} roomId */ async createRoomMCS (roomId) { - var self = this; + let self = this; console.log(" [media] Creating new room with ID " + roomId); @@ -209,7 +287,7 @@ module.exports = class MediaController { switch (type) { case C.USERS.SFU: - user = new SfuUser(roomId, type, params.userAgentString, params.sdp); + user = new SfuUser(roomId, type, this.emitter, params.userAgentString, params.sdp); break; case C.USERS.MCU: console.log(" [media] createUserMCS MCU TODO"); @@ -226,6 +304,6 @@ module.exports = class MediaController { } getUserMCS (userId) { - Promise.resolve(self._users[user.id]); + return this._users[userId]; } } diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js index 118c860cd2f6b4d730d4d916a90f5ba7809f91ed..7ba91f197ee19ae540f2eba3ef64ccd6be02b5b9 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js @@ -6,14 +6,27 @@ const mediaServerClient = require('kurento-client'); const util = require('util'); const EventEmitter = require('events').EventEmitter; +let instance = null; + /* Public members */ -exports = class MediaServer extends EventEmitter { +module.exports = class MediaServer extends EventEmitter { constructor(serverUri) { - super(); - this._serverUri = serverUri; - this._mediaServer = _getMediaServerClient(this._serverUri); - this._mediaPipelines = {}; - this._mediaElements= {}; + if(!instance){ + super(); + this._serverUri = serverUri; + this._mediaPipelines = {}; + this._mediaElements= {}; + this._mediaServer; + instance = this; + } + + return instance; + } + + async init () { + if (typeof this._mediaServer === 'undefined' || !this._mediaServer) { + this._mediaServer = await this._getMediaServerClient(this._serverUri); + } } _getMediaServerClient (serverUri) { @@ -22,6 +35,7 @@ exports = class MediaServer extends EventEmitter { if (error) { reject(error); } + console.log(" [media] Retrieved media server client => " + client); resolve(client); }); }); @@ -29,17 +43,17 @@ exports = class MediaServer extends EventEmitter { _getMediaPipeline (conference) { return new Promise((resolve, reject) => { - if (_mediaPipelines[conference]) { - console.log(' [media] Pipeline already exists. ' + JSON.stringify(_mediaPipelines, null, 2)); - resolve(_mediaPipelines[conference]); + if (this._mediaPipelines[conference]) { + console.log(' [media] Pipeline already exists. ' + JSON.stringify(this._mediaPipelines, null, 2)); + resolve(this._mediaPipelines[conference]); } else { - this._mediaServer.create('MediaPipeline', function(err, pipeline) { + this._mediaServer.create('MediaPipeline', (error, pipeline) => { if (error) { console.log(error); reject(error); } - _mediaPipelines[conference] = pipeline; + this._mediaPipelines[conference] = pipeline; resolve(pipeline); }); } @@ -47,7 +61,7 @@ exports = class MediaServer extends EventEmitter { } _releasePipeline (pipelineId) { - let mediaPipeline = _mediaPipelines[pipelineId]; + let mediaPipeline = this._mediaPipelines[pipelineId]; if (typeof mediaPipeline !== 'undefined' && typeof mediaPipeline.release === 'function') { mediaElement.release(); @@ -58,11 +72,11 @@ exports = class MediaServer extends EventEmitter { return new Promise((resolve, reject) => { pipeline.create(type, (error, mediaElement) => { if (error) { - reject(error); + return reject(error); } console.log(" [MediaController] Created [" + type + "] media element: " + mediaElement.id); - _mediaElements[mediaElement.id] = mediaElement; - resolve(mediaElement); + this._mediaElements[mediaElement.id] = mediaElement; + return resolve(mediaElement); }); }); } @@ -72,16 +86,16 @@ exports = class MediaServer extends EventEmitter { try { const pipeline = await this._getMediaPipeline(conference); const mediaElement = await this._createElement(pipeline, type); - Promise.resolve(mediaElement.id); + return Promise.resolve(mediaElement.id); } catch (err) { - return Promise.reject(new Error(e)); + return Promise.reject(new Error(err)); } } - async connectMediaElements (sourceId, sinkId, type) { - let source = _mediaElements[sourceId]; - let sink = _mediaElements[sinkId]; + async connect (sourceId, sinkId, type) { + let source = this._mediaElements[sourceId]; + let sink = this._mediaElements[sinkId]; if (source && sink) { return new Promise((resolve, reject) => { @@ -89,9 +103,9 @@ exports = class MediaServer extends EventEmitter { case 'ALL': source.connect(sink, (error) => { if (error) { - reject(error); + return reject(error); } - resolve(); + return resolve(); }); break; @@ -100,22 +114,23 @@ exports = class MediaServer extends EventEmitter { case 'VIDEO': source.connect(sink, (error) => { if (error) { - reject(error); + return reject(error); } - resolve(); + return resolve(); }); break; - default: reject("[mcs] Invalid connect type"); + default: return reject("[mcs] Invalid connect type"); } }); } - - return Promise.reject("Failed to connect " + type + ": " + sourceId + " to " + sinkId); + else { + return Promise.reject("Failed to connect " + type + ": " + sourceId + " to " + sinkId); + } } stop (elementId) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; // TODO remove event listeners if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { mediaElement.release(); @@ -124,31 +139,42 @@ exports = class MediaServer extends EventEmitter { addIceCandidate (elementId, candidate) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; + let kurentoCandidate = mediaServerClient.getComplexType('IceCandidate')(candidate); - if (typeof mediaElement !== 'undefined' && typeof mediaElement.addIceCandidate === 'function') { + if (typeof mediaElement !== 'undefined' && typeof mediaElement.addIceCandidate === 'function' && + typeof candidate !== 'undefined') { mediaElement.addIceCandidate(candidate); + console.log(" [media] Added ICE candidate for => " + elementId); + return Promise.resolve(); + } + else { + return Promise.reject(new Error("Candidate could not be parsed or media element does not exist")); } } gatherCandidates (elementId) { - let mediaElement = _mediaElements[elementId]; + console.log(' [media] Gathering ICE candidates for ' + elementId); + let mediaElement = this._mediaElements[elementId]; return new Promise((resolve, reject) => { if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') { mediaElement.gatherCandidates((error) => { if (error) { - reject(error); + return reject(new Error(error)); } - resolve(); + console.log(' [media] Triggered ICE gathering for ' + elementId); + return resolve(); }); } - reject(" [MediaController/gatherCandidates] There is no element " + elementId); + else { + return reject(" [MediaController/gatherCandidates] There is no element " + elementId); + } }); } setInputBandwidth (elementId, min, max) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; if (typeof mediaElement !== 'undefined') { endpoint.setMinVideoRecvBandwidth(min); @@ -159,7 +185,7 @@ exports = class MediaServer extends EventEmitter { } setOutputBandwidth (endpoint, min, max) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; if (typeof mediaElement !== 'undefined') { endpoint.setMinVideoSendBandwidth(min); @@ -170,7 +196,7 @@ exports = class MediaServer extends EventEmitter { } setOutputBitrate (endpoint, min, max) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; if (typeof mediaElement !== 'undefined') { endpoint.setMinOutputBitrate(min); @@ -181,35 +207,43 @@ exports = class MediaServer extends EventEmitter { } processOffer (elementId, sdpOffer) { - let mediaElement = _mediaElements[elementId]; + let mediaElement = this._mediaElements[elementId]; return new Promise((resolve, reject) => { if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') { mediaElement.processOffer(sdpOffer, (error, answer) => { if (error) { - reject(error); + return reject(error); } - resolve(answer); + return resolve(answer); }); } - reject(" [MediaController/processOffer] There is no element " + elementId); + else { + return reject(" [MediaController/processOffer] There is no element " + elementId); + } }); } trackMediaState (elementId, type) { switch (type) { case C.MEDIA_TYPE.URI: - this.addMediaEventListener(elementId, C.MEDIA_TYPE.ENDOFSTREAM); - this.addMediaEventListener(elementId, C.MEDIA_TYPE.CHANGED); - this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_IN); - this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_OUT); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.ENDOFSTREAM, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.CHANGED, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_IN, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_OUT, elementId); break; case C.MEDIA_TYPE.WEBRTC: + this.addMediaEventListener(C.EVENT.MEDIA_STATE.CHANGED, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_IN, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_OUT, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.ICE, elementId); + break; + case C.MEDIA_TYPE.RTP: - this.addMediaEventListener(elementId, C.MEDIA_TYPE.CHANGED); - this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_IN); - this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_OUT); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.CHANGED, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_IN, elementId); + this.addMediaEventListener(C.EVENT.MEDIA_STATE.FLOW_OUT, elementId); break; default: return; @@ -217,12 +251,17 @@ exports = class MediaServer extends EventEmitter { return; } - addMediaEventListener (elementId, eventTag) { - let mediaElement = _mediaElements[elementId]; + addMediaEventListener (eventTag, elementId) { + let mediaElement = this._mediaElements[elementId]; // TODO event type validator if (typeof mediaElement !== 'undefined' && mediaElement) { + console.log(' [media] Adding media state listener [' + eventTag + '] for ' + elementId); mediaElement.on(eventTag, (event) => { - this.emit(C.MEDIA_STATE.MEDIA_EVENT , {elementId, eventTag, event}); + if (eventTag === C.EVENT.MEDIA_STATE.ICE) { + console.log(" [media] Relaying ICE for MediaState" + elementId); + event.candidate = mediaServerClient.getComplexType('IceCandidate')(event.candidate); + } + this.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+elementId , {eventTag, event}); }); } } diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js index 752f971d47b650f3958a57a35a472516255b725f..be8261bff04f418201971000ed8809ede3b4da24 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js @@ -10,11 +10,14 @@ const SdpWrapper = require('../utils/SdpWrapper'); const uuidv4 = require('uuid/v4'); const EventEmitter = require('events').EventEmitter; const MediaServer = require('../media/media-server'); +const config = require('config'); +const kurentoUrl = config.get('kurentoUrl'); -module.exports = class SdpSession extends EventEmitter { - constructor(sdp = null, type = C.MEDIA_TYPE.RTP) { - super(); +module.exports = class SdpSession { + constructor(emitter, sdp = null, room, type = 'WebRtcEndpoint') { this.id = uuidv4(); + this.room = room; + this.emitter = emitter; this._status = C.STATUS.STOPPED; this._type = type; // {SdpWrapper} SdpWrapper @@ -22,6 +25,8 @@ module.exports = class SdpSession extends EventEmitter { if (sdp && type) { this.setSdp(sdp, type); } + this._MediaServer = new MediaServer(kurentoUrl); + this._mediaElement; } async setSdp (sdp, type) { @@ -32,15 +37,30 @@ module.exports = class SdpSession extends EventEmitter { async start (sdpId) { this._status = C.STATUS.STARTING; try { - console.log("[mcs] start/cme"); - const mediaElement = await MediaServer.createMediaElement(this.id, this._type); - console.log("[mcs] start/po " + mediaElement); - const answer = await MediaServer.processOffer(mediaElement, this._sdp.getMainDescription()); - Promise.resolve(answer); + const client = await this._MediaServer.init(); + + console.log("[SdpSession] start/cme"); + this._mediaElement = await this._MediaServer.createMediaElement(this.room, this._type); + console.log("[SdpSession] start/po " + this._mediaElement); + + this._MediaServer.trackMediaState(this._mediaElement, this._type); + this._MediaServer.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+this._mediaElement, (event) => { + console.log(" [SdpSession] Relaying EVENT MediaState" + this.id); + event.id = this.id; + this.emitter.emit(C.EVENT.MEDIA_STATE.MEDIA_EVENT+this.id, event); + }); + + const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getMainDescription()); + + if (this._type === 'WebRtcEndpoint') { + this._MediaServer.gatherCandidates(this._mediaElement); + } + + return Promise.resolve(answer); } catch (err) { this.handleError(err); - Promise.reject(err); + return Promise.reject(err); } } @@ -48,7 +68,7 @@ module.exports = class SdpSession extends EventEmitter { async stop () { this._status = C.STATUS.STOPPING; try { - await MediaServer.stop(this.id); + await this._MediaServer.stop(this.id); this._status = C.STATUS.STOPPED; Promise.resolve(); } @@ -60,17 +80,34 @@ module.exports = class SdpSession extends EventEmitter { // TODO move to parent Session + // TODO handle connection type async connect (sinkId) { try { - await MediaServer.connect(this.id, sinkId); - Promise.resolve(); + console.log(" [SdpSession] Connecting " + this._mediaElement + " => " + sinkId); + await this._MediaServer.connect(this._mediaElement, sinkId, 'ALL'); + return Promise.resolve(); } catch (err) { this.handleError(err); + return Promise.reject(err); + } + } + + async addIceCandidate (candidate) { + try { + console.log(" [SdpSession] Adding ICE candidate for => " + this._mediaElement); + await this._MediaServer.addIceCandidate(this._mediaElement, candidate); + Promise.resolve(); + } + catch (err) { Promise.reject(err); } } + addMediaEventListener (type, mediaId) { + this._MediaServer.addMediaEventListener (type, mediaId); + } + handleError (err) { console.log(err); this._status = C.STATUS.STOPPED; diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js index 728447b6efecd32f6aa3ac39f644a8569e05234c..07020701edc00f05ad7ef368fa59da6c2646868c 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js @@ -12,13 +12,14 @@ const SdpSession = require('../model/SdpSession'); const UriSession = require('../model/UriSession'); module.exports = class SfuUser extends User { - constructor(_roomId, type, userAgentString = C.STRING.ANONYMOUS, sdp = null, uri = null) { + constructor(_roomId, type, emitter, userAgentString = C.STRING.ANONYMOUS, sdp = null, uri = null) { super(_roomId); // {SdpWrapper} SdpWrapper this._sdp; // {Object} hasAudio, hasVideo, hasContent this._mediaSessions = {} this.userAgentString; + this.emitter = emitter; if (sdp) { this.addSdp(sdp); } @@ -40,7 +41,7 @@ module.exports = class SfuUser extends User { await this.startSession(session.id); Promise.resolve(session.id); } - catch (error) { + catch (err) { this.handleError(err); Promise.reject(new Error(err)); } @@ -48,52 +49,55 @@ module.exports = class SfuUser extends User { addSdp (sdp, type) { // TODO switch from type to children SdpSessions (WebRTC|SDP) - let session = new SdpSession(sdp, type); + let session = new SdpSession(this.emitter, sdp, this.roomId, type); + this.emitter.emit(C.EVENT.NEW_SESSION+this.dd, session.id); if (typeof this._mediaSessions[session.id] == 'undefined' || !this._mediaSessions[session.id]) { this._mediaSessions[session.id] = {}; } this._mediaSessions[session.id] = session; - console.log("[mcs] Added SDP " + session.id); + console.log("[SfuUser] Added SDP " + session.id); - return session.id; + return session; } async startSession (sessionId) { - console.log("[mcs] starting session " + sessionId); + console.log("[SfuUser] starting session " + sessionId); let session = this._mediaSessions[sessionId]; try { const answer = await session.start(); - Promise.resolve(answer); + console.log("WELL"); + console.log(answer); + return Promise.resolve(answer); } - catch (error) { + catch (err) { this.handleError(err); - Promise.reject(new Error(err)); + return Promise.reject(new Error(err)); } } async subscribe (sdp, mediaId) { - let sessionId = await this.addSdp(sdp); + let session = await this.addSdp(sdp); try { - await this.startSession(sessionId); - await this.connect(sessionId, mediaId); + await this.startSession(session.id); + await this.connect(session.id, mediaId); Promise.resolve(); } - catch (error) { + catch (err) { this.handleError(err); Promise.reject(new Error(err)); } } async publish (sdp, mediaId) { - let sessionId = await this.addSdp(sdp); + let session = await this.addSdp(sdp); try { - await this.startSession(sessionId); + await this.startSession(session.id); Promise.resolve(); } - catch (error) { + catch (err) { this.handleError(err); Promise.reject(new Error(err)); } @@ -101,10 +105,10 @@ module.exports = class SfuUser extends User { async unsubscribe (sdp, mediaId) { try { - await this.stopSession(sessionId); + await this.stopSession(mediaId); Promise.resolve(); } - catch (error) { + catch (err) { this.handleError(err); Promise.reject(new Error(err)); } @@ -112,10 +116,10 @@ module.exports = class SfuUser extends User { async unpublish (sdp, mediaId) { try { - await this.stopSession(sessionId); + await this.stopSession(mediaId); Promise.resolve(); } - catch (error) { + catch (err) { this.handleError(err); Promise.reject(new Error(err)); } @@ -126,7 +130,8 @@ module.exports = class SfuUser extends User { try { await session.stop(); - Promise.resolve(); + this._mediaSessions[sdpId] = null; + return Promise.resolve(); } catch (err) { this.handleError(err); @@ -135,17 +140,21 @@ module.exports = class SfuUser extends User { } async connect (sourceId, sinkId) { - let session = this._mediaSessions[sessionId]; + let session = this._mediaSessions[sourceId]; if(session) { try { + console.log(" [SfuUser] Connecting sessions " + sourceId + "=>" + sinkId); await session.connect(sinkId); - Promise.resolve(); + return Promise.resolve(); } - catch (error) { + catch (err) { this.handleError(err); - Promise.reject(new Error(err)); + return Promise.reject(new Error(err)); } } + else { + return Promise.reject(new Error(" [SfuUser] Source session " + sourceId + " not found")); + } } handleError (err) { diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js b/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js index 64d8c9b76300a2c02f0231e62b36f59d21656156..eea0d58895e7424e145df30a93d7a20fe6db1a29 100644 --- a/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js +++ b/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js @@ -203,9 +203,9 @@ module.exports = class SdpWrapper { async processSdp () { let description = this._plainSdp; - if(config.get('kurento.force_low_resolution')) { - description = this.removeFmtp(description); - } + //if(config.get('kurento.force_low_resolution')) { + // description = this.removeFmtp(description); + //} description = description.toString().replace(/telephone-event/, "TELEPHONE-EVENT"); @@ -215,7 +215,7 @@ module.exports = class SdpWrapper { this.sdpSessionDescription = this.getSessionDescription(description); this.audioSdp = this.getAudioDescription(description); this.mainVideoSdp = this.getMainDescription(description); - this.mainVideoSdp = this.removeHighQualityFmtps(this.mainVideoSdp); + //this.mainVideoSdp = this.removeHighQualityFmtps(this.mainVideoSdp); this.contentVideoSdp = this.getContentDescription(description); return; diff --git a/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js index d93374abefb439c514c22c1d86f41b5f96665b2e..393ea31e0d63dd3736d9669021d866cd4316d1dc 100644 --- a/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js +++ b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js @@ -9,7 +9,7 @@ const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw'); var Screenshare = require('./screenshare'); -var C = require('../bbb/messages/Constants'); +var C = require('../bbb/messages/Constants'); // Global variables module.exports = class ScreenshareManager { @@ -24,7 +24,7 @@ module.exports = class ScreenshareManager { this._redisGateway; } - async start() { + async start() { let self = this; try { @@ -33,7 +33,7 @@ module.exports = class ScreenshareManager { this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this)); process.on('message', this._onMessage.bind(this)); console.log(' [ScreenshareManager] Successfully subscribed to redis channel'); - } + } catch (error) { console.log(' [ScreenshareManager] Could not connect to transcoder redis channel, finishing app...'); console.log(error); @@ -47,7 +47,6 @@ module.exports = class ScreenshareManager { let session; let message = _message; - console.log(message); // The sessionId is voiceBridge for screensharing sessions let sessionId = message.voiceBridge; if(this._screenshareSessions[sessionId]) { @@ -66,7 +65,6 @@ module.exports = class ScreenshareManager { self._screenshareSessions[message.voiceBridge] = session; } - //session.on('message', self._assembleSessionMessage.bind(self)); if(session) { break; } diff --git a/labs/node-bbb-apps/lib/video/video.js b/labs/node-bbb-apps/lib/video/video.js index f2c225382b4c8c16958dffcb78a190ba9a0ad43c..72c8f0f04d0a63bbc0f26b42c1a0e8c278aa8a3d 100644 --- a/labs/node-bbb-apps/lib/video/video.js +++ b/labs/node-bbb-apps/lib/video/video.js @@ -1,3 +1,4 @@ +'use strict'; // Global stuff var mediaPipelines = {}; var sharedWebcams = {}; @@ -7,6 +8,7 @@ var sharedWebcams = {}; const kurento = require('kurento-client'); const config = require('config'); const kurentoUrl = config.get('kurentoUrl'); +const MCSApi = require('../mcs-core/lib/media/MCSApiStub'); if (config.get('acceptSelfSignedCertificate')) { process.env.NODE_TLS_REJECT_UNAUTHORIZED=0; @@ -56,106 +58,199 @@ function getMediaPipeline(id, callback) { } -function Video(_ws, _id, _shared) { - - var ws = _ws; - var id = _id; - var shared = _shared; - var webRtcEndpoint = null; +module.exports = class Video { + constructor(_ws, _id, _shared) { + this.mcs = new MCSApi(); + this.ws = _ws; + this.id = _id; + this.meetingId = _id; + this.shared = _shared; + this.webRtcEndpoint = null; + this.mediaId = null; + + this.candidatesQueue = []; + } - var candidatesQueue = []; + onIceCandidate (_candidate) { + console.log(" [video] Got ICE candidate"); - this.onIceCandidate = function(_candidate) { - var candidate = kurento.getComplexType('IceCandidate')(_candidate); + //if (webRtcEndpoint) { + // webRtcEndpoint.addIceCandidate(candidate); + //} + //else { + // candidatesQueue.push(candidate); + //} - if (webRtcEndpoint) { - webRtcEndpoint.addIceCandidate(candidate); + if (this.mediaId) { + try { + this.flushCandidatesQueue(); + this.mcs.addIceCandidate(this.mediaId, _candidate); + } + catch (err) { + console.log(err); + } } else { - candidatesQueue.push(candidate); + this.candidatesQueue.push(_candidate); } }; - this.start = function(sdpOffer, callback) { + flushCandidatesQueue () { + if (this.mediaId) { + try { + while(this.candidatesQueue.length) { + let candidate = this.candidatesQueue.shift(); + this.mcs.addIceCandidate(this.mediaId, candidate); + } + } + catch (err) { + console.log(err); + } + } + } - getKurentoClient(function(error, kurentoClient) { + mediaState (event) { + let msEvent = event.event; + switch (event.eventTag) { + case "OnIceCandidate": + console.log(" [video] Sending ICE candidate to user => " + this.id); + let candidate = msEvent.candidate; + this.ws.sendMessage({ id : 'iceCandidate', cameraId: this.id, candidate : candidate }); + break; + case "MediaStateChanged": + break; + case "MediaFlowOutStateChange": + case "MediaFlowInStateChange": + console.log(' [video] ' + msEvent.type + '[' + msEvent.state + ']' + ' for endpoint ' + this.id); + if (msEvent.state === 'NOT_FLOWING') { + this.ws.sendMessage({ id : 'playStop', cameraId : this.id }); + } else if (msEvent.state === 'FLOWING') { + this.ws.sendMessage({ id : 'playStart', cameraId : this.id }); + } + break; + default: console.log(" [video] Unrecognized event"); + } + } - if (error) { - return callback(error); - } + async start (sdpOffer, callback) { + console.log(" [video] start"); + let sdpAnswer; - getMediaPipeline(id, function(error, pipeline) { + try { + this.userId = await this.mcs.join(this.meetingId, 'SFU', {}); + console.log(" [video] Join returned => " + this.userId); - if (error) { - return callback(error); - } + if (this.shared) { + const ret = await this.mcs.publish(this.userId, this.meetingId, 'WebRtcEndpoint', {descriptor: sdpOffer}); + this.mediaId = ret.sessionId; + sharedWebcams[this.id] = this.mediaId; + sdpAnswer = ret.answer; + this.flushCandidatesQueue(); + this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this)); - createMediaElements(pipeline, function(error, _webRtcEndpoint) { + console.log(" [video] Publish returned => "); + console.log(this.mediaId); - if (error) { - pipeline.release(); - return callback(error); - } + return callback(null, sdpAnswer); + } + else { + const ret = await this.mcs.subscribe(this.userId, 'WebRtcEndpoint', sharedWebcams[this.id], {descriptor: sdpOffer}); + this.mediaId = ret.sessionId; + sdpAnswer = ret.answer; + this.flushCandidatesQueue(); + this.mcs.on('MediaEvent' + this.mediaId, this.mediaState.bind(this)); - while(candidatesQueue.length) { - var candidate = candidatesQueue.shift(); - _webRtcEndpoint.addIceCandidate(candidate); - } + console.log(" [video] Subscribe returned => "); + console.log(this.mediaId); - var flowInOut = function(event) { - console.log(' [=] ' + event.type + ' for endpoint ' + id); - - if (event.state === 'NOT_FLOWING') { - ws.sendMessage({ id : 'playStop', cameraId : id }); - } else if (event.state === 'FLOWING') { - ws.sendMessage({ id : 'playStart', cameraId : id }); - } - }; - - _webRtcEndpoint.on('MediaFlowInStateChange', flowInOut); - _webRtcEndpoint.on('MediaFlowOutStateChange', flowInOut); - - connectMediaElements(_webRtcEndpoint, function(error) { - - if (error) { - pipeline.release(); - return callback(error); - } - - // It's a user sharing a webcam - if (shared) { - sharedWebcams[id] = _webRtcEndpoint; - } - - // Store our endpoint - webRtcEndpoint = _webRtcEndpoint; - - _webRtcEndpoint.on('OnIceCandidate', function(event) { - var candidate = kurento.getComplexType('IceCandidate')(event.candidate); - ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate }); - }); - - _webRtcEndpoint.processOffer(sdpOffer, function(error, sdpAnswer) { - if (error) { - pipeline.release(); - return callback(error); - } - - return callback(null, sdpAnswer); - }); - - _webRtcEndpoint.gatherCandidates(function(error) { - if (error) { - return callback(error); - } - }); - }); - }); - }); - }); + + return callback(null, sdpAnswer); + + } + } + catch (err) { + console.log(" [video] MCS returned error => " + err); + return callback(err); + } + + //getKurentoClient(function(error, kurentoClient) { + + // if (error) { + // return callback(error); + // } + + // getMediaPipeline(id, function(error, pipeline) { + + // if (error) { + // return callback(error); + // } + + // createMediaElements(pipeline, function(error, _webRtcEndpoint) { + + // if (error) { + // pipeline.release(); + // return callback(error); + // } + + // while(candidatesQueue.length) { + // let candidate = candidatesQueue.shift(); + // _webRtcEndpoint.addIceCandidate(candidate); + // } + + // var flowInOut = function(event) { + // console.log(' [=] ' + event.type + ' for endpoint ' + id); + + // if (event.state === 'NOT_FLOWING') { + // ws.sendMessage({ id : 'playStop', cameraId : id }); + // } else if (event.state === 'FLOWING') { + // ws.sendMessage({ id : 'playStart', cameraId : id }); + // } + // }; + + // _webRtcEndpoint.on('MediaFlowInStateChange', flowInOut); + // _webRtcEndpoint.on('MediaFlowOutStateChange', flowInOut); + + // connectMediaElements(_webRtcEndpoint, function(error) { + + // if (error) { + // pipeline.release(); + // return callback(error); + // } + + // // It's a user sharing a webcam + // if (shared) { + // sharedWebcams[id] = _webRtcEndpoint; + // } + + // // Store our endpoint + // webRtcEndpoint = _webRtcEndpoint; + + // _webRtcEndpoint.on('OnIceCandidate', function(event) { + // let candidate = kurento.getComplexType('IceCandidate')(event.candidate); + // ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate }); + // }); + + // _webRtcEndpoint.processOffer(sdpOffer, function(error, sdpAnswer) { + // if (error) { + // pipeline.release(); + // return callback(error); + // } + + // return callback(null, sdpAnswer); + // }); + + // _webRtcEndpoint.gatherCandidates(function(error) { + // if (error) { + // return callback(error); + // } + // }); + // }); + // }); + // }); + //}); }; - var createMediaElements = function(pipeline, callback) { + createMediaElements (pipeline, callback) { console.log(" [webrtc] Creating webrtc endpoint"); @@ -171,7 +266,7 @@ function Video(_ws, _id, _shared) { }); }; - var connectMediaElements = function(webRtcEndpoint, callback) { + connectMediaElements (webRtcEndpoint, callback) { // User is sharing webcam (sendOnly connection from the client) if (shared) { @@ -185,7 +280,7 @@ function Video(_ws, _id, _shared) { console.log(" [webrtc] User wants to receive webcam "); if (sharedWebcams[id]) { - var wRtc = sharedWebcams[id]; + let wRtc = sharedWebcams[id]; wRtc.connect(webRtcEndpoint, function(error) { @@ -199,34 +294,30 @@ function Video(_ws, _id, _shared) { } }; - this.stop = function() { + stop () { - console.log(' [stop] Releasing webrtc endpoint for ' + id); + //console.log(' [stop] Releasing webrtc endpoint for ' + id); - if (webRtcEndpoint) { - webRtcEndpoint.release(); - webRtcEndpoint = null; - } else { - console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); - } + //if (webRtcEndpoint) { + // webRtcEndpoint.release(); + // webRtcEndpoint = null; + //} else { + // console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); + //} - if (shared) { - console.log(' [stop] Webcam is shared, releasing ' + id); + //if (shared) { + // console.log(' [stop] Webcam is shared, releasing ' + id); - if (mediaPipelines[id]) { - mediaPipelines[id].release(); - } else { - console.log(" [mediaPipeline] PLEASE DONT TRY STOPPING THINGS TWICE"); - } + // if (mediaPipelines[id]) { + // mediaPipelines[id].release(); + // } else { + // console.log(" [mediaPipeline] PLEASE DONT TRY STOPPING THINGS TWICE"); + // } - delete mediaPipelines[id]; - delete sharedWebcams[id]; - } + // delete mediaPipelines[id]; + // delete sharedWebcams[id]; + //} - delete candidatesQueue; + //delete this.candidatesQueue; }; - - return this; }; - -module.exports = Video; \ No newline at end of file diff --git a/labs/node-bbb-apps/package.json b/labs/node-bbb-apps/package.json index 908d722dd200603a9f438a2bc6aeb1b05739baf2..89282209f02fa0c0b037939cc821b7906e4f95cf 100644 --- a/labs/node-bbb-apps/package.json +++ b/labs/node-bbb-apps/package.json @@ -9,7 +9,7 @@ "cookie-parser": "^1.3.5", "express": "~4.12.4", "express-session": "~1.10.3", - "kurento-client": "6.6.0", + "kurento-client": "https://github.com/Kurento/kurento-client-js#master", "moment": "*", "redis": "^2.6.2", "sdp-transform": "*", diff --git a/labs/node-bbb-apps/server.js b/labs/node-bbb-apps/server.js index e9de39af74dd1ac7b03863ced395781f8d8c8b78..a0b2e12da65cde955b7b257a7a2e115e4b63ce98 100755 --- a/labs/node-bbb-apps/server.js +++ b/labs/node-bbb-apps/server.js @@ -55,6 +55,7 @@ process.on('SIGTERM', process.exit) process.on('SIGINT', process.exit) process.on('uncaughtException', function (error) { console.log(error.stack); + process.exit('1'); });