diff --git a/labs/bbb-webrtc-sfu/lib/ProcessManager.js b/labs/bbb-webrtc-sfu/lib/ProcessManager.js index d46d6fc4b36ff1dc58e348aa58c906f5cfb0c5ad..842af90e3d59470d34e0093d1b359a0c66eaefe3 100644 --- a/labs/bbb-webrtc-sfu/lib/ProcessManager.js +++ b/labs/bbb-webrtc-sfu/lib/ProcessManager.js @@ -11,6 +11,7 @@ const cp = require('child_process'); const Logger = require('./utils/Logger'); const SCREENSHARE_PATH = './lib/screenshare/ScreenshareProcess'; const VIDEO_PATH = './lib/video/VideoProcess.js'; +const AUDIO_PATH = './lib/audio/AudioProcess.js'; module.exports = class ProcessManager { constructor() { @@ -18,26 +19,29 @@ module.exports = class ProcessManager { this.videoPid; this.screenshareProcess; this.videoProcess; + this.audioProcess; this.processes = {}; this.runningState = "RUNNING"; } async start () { let screenshareProcess = this.startProcess(SCREENSHARE_PATH); - let videoProcess = this.startProcess(VIDEO_PATH); + let audioProcess = this.startProcess(AUDIO_PATH); + this.processes[screenshareProcess.pid] = screenshareProcess; this.processes[videoProcess.pid] = videoProcess; + this.processes[audioProcess.pid] = audioProcess; process.on('SIGTERM', async () => { await this.finishChildProcesses(); - process.exit(0); + process.exit(0); }); process.on('SIGINT', async () => { await this.finishChildProcesses(); - process.exit(0); + process.exit(0); }); process.on('uncaughtException', async (error) => { diff --git a/labs/bbb-webrtc-sfu/lib/audio/AudioManager.js b/labs/bbb-webrtc-sfu/lib/audio/AudioManager.js new file mode 100755 index 0000000000000000000000000000000000000000..13c97eda7584fe317e65be6d28f2918a9fa81f8b --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/audio/AudioManager.js @@ -0,0 +1,152 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +"use strict"; + +const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw'); +const Audio = require('./audio'); +const C = require('../bbb/messages/Constants'); +const Logger = require('../utils/Logger'); + +module.exports = class AudioManager { + constructor () { + this._clientId = 0; + + this._sessions = {}; + this._audioSessions = {}; + + this._bbbGW = new BigBlueButtonGW('MANAGER'); + this._redisGateway; + } + + async start() { + try { + this._redisGateway = await this._bbbGW.addSubscribeChannel(C.TO_AUDIO); + this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this)); + } + catch (error) { + Logger.error('[AudioManager] Could not connect to audio redis channel, finishing audio app with error', error); + this.stopAll(); + } + } + + _onMessage(message) { + Logger.debug('[AudioManager] Received message [' + message.id + '] from connection', message.connectionId); + let session; + + let sessionId = message.voiceBridge.split('-')[0]; + let voiceBridge = sessionId; + let connectionId = message.connectionId; + + if(this._audioSessions[sessionId]) { + session = this._audioSessions[sessionId]; + } + + switch (message.id) { + case 'start': + + if (!session) { + session = new Audio(this._bbbGW, connectionId, voiceBridge); + } + + this._audioSessions[sessionId] = {} + this._audioSessions[sessionId] = session; + + // starts audio session by sending sessionID, websocket and sdpoffer + session.start(sessionId, connectionId, message.sdpOffer, message.callerName, (error, sdpAnswer) => { + Logger.info("[AudioManager] Started presenter ", sessionId, " for connection", connectionId); + Logger.debug("[AudioManager] SDP answer was", sdpAnswer); + if (error) { + this._bbbGW.publish(JSON.stringify({ + connectionId: connectionId, + type: 'audio', + id : 'startResponse', + response : 'rejected', + message : error + }), C.FROM_AUDIO); + return error; + } + + this._bbbGW.publish(JSON.stringify({ + connectionId: connectionId, + id : 'startResponse', + type: 'audio', + response : 'accepted', + sdpAnswer : sdpAnswer + }), C.FROM_AUDIO); + + Logger.info("[AudioManager] Sending startResponse to user", sessionId, "for connection", session._id); + }); + break; + + case 'stop': + Logger.info('[AudioManager] Received stop message for session', sessionId, "at connection", connectionId); + + if (session) { + session.stopListener(connectionId); + } else { + Logger.warn("[AudioManager] There was no audio session on stop for", sessionId); + } + break; + + case 'iceCandidate': + if (session) { + session.onIceCandidate(message.candidate, connectionId); + } else { + Logger.warn("[AudioManager] There was no audio session for onIceCandidate for", sessionId, ". There should be a queue here"); + } + break; + + case 'close': + Logger.info('[AudioManager] Connection ' + connectionId + ' closed'); + + if (typeof session !== 'undefined') { + Logger.info("[AudioManager] Stopping viewer " + sessionId); + session.stopListener(message.connectionId); + } + break; + + default: + this._bbbGW.publish(JSON.stringify({ + connectionId: connectionId, + id : 'error', + type: 'audio', + message: 'Invalid message ' + message + }), C.FROM_AUDIO); + break; + } + } + + _stopSession(sessionId) { + Logger.info('[AudioManager] Stopping session ' + sessionId); + + if (typeof this._audioSessions === 'undefined' || typeof sessionId === 'undefined') { + return; + } + + let session = this._audioSessions[sessionId]; + if(typeof session !== 'undefined' && typeof session._stop === 'function') { + session.stopAll(); + } + + delete this._audioSessions[sessionId]; + } + + stopAll() { + Logger.info('[AudioManager] Stopping everything! '); + + if (typeof this._audioSessions === 'undefined') { + return; + } + + let sessionIds = Object.keys(this._audioSessions); + + for (let i = 0; i < sessionIds.length; i++) { + this._stopSession(sessionIds[i]); + } + } +}; diff --git a/labs/bbb-webrtc-sfu/lib/audio/AudioProcess.js b/labs/bbb-webrtc-sfu/lib/audio/AudioProcess.js new file mode 100644 index 0000000000000000000000000000000000000000..47076bb9f7dfaf50bf9abd5ce69cd26c49c2bb37 --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/audio/AudioProcess.js @@ -0,0 +1,32 @@ +'use strict'; + +const AudioManager = require('./AudioManager'); +const Logger = require('../utils/Logger'); +const config = require('config'); + +if (config.get('acceptSelfSignedCertificate')) { + process.env.NODE_TLS_REJECT_UNAUTHORIZED=0; +} + +let am = new AudioManager(); + +process.on('uncaughtException', (error) => { + Logger.error('[AudioProcess] Uncaught exception ', error.stack); +}); + +process.on('disconnect', async () => { + Logger.warn('[AudioProcess] Parent process exited, cleaning things up and finishing child now...'); + //TODO below + //async AudioManager.stopAll(); + process.exit(0); +}); + +// Added this listener to identify unhandled promises, but we should start making +// sense of those as we find them +process.on('unhandledRejection', (reason, p) => { + Logger.error('[AudioProcess] Unhandled Rejection at: Promise', p, 'reason:', reason); +}); + +// This basically starts the audio Manager routines which listens the connection +// manager messages routed to "to-sfu-audio" +am.start(); diff --git a/labs/bbb-webrtc-sfu/lib/audio/audio.js b/labs/bbb-webrtc-sfu/lib/audio/audio.js new file mode 100644 index 0000000000000000000000000000000000000000..b30af02c649254c57178bc9fbdf140db1e728f89 --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/audio/audio.js @@ -0,0 +1,200 @@ +'use strict'; + +const kurento = require('kurento-client'); +const config = require('config'); +const kurentoUrl = config.get('kurentoUrl'); +const MCSApi = require('../mcs-core/lib/media/MCSApiStub'); +const C = require('../bbb/messages/Constants'); +const Logger = require('../utils/Logger'); + + +module.exports = class Audio { + constructor(_bbbGW, _id, voiceBridge) { + this.mcs = new MCSApi(); + this.bbbGW = _bbbGW; + this.id = _id; + this.voiceBridge = voiceBridge; + this.sourceAudio; + this.sourceAudioStarted = false; + this.audioEndpoints = {}; + this.role; + this.webRtcEndpoint = null; + this.userId; + + this.candidatesQueue = {} + } + + onIceCandidate (_candidate, connectionId) { + if (this.audioEndpoints[connectionId]) { + try { + this.flushCandidatesQueue(connectionId); + this.mcs.addIceCandidate(this.audioEndpoints[connectionId], _candidate); + } + catch (err) { + Logger.error("[audio] ICE candidate could not be added to media controller.", err); + } + } + else { + if(!this.candidatesQueue[connectionId]) { + this.candidatesQueue[connectionId] = []; + } + this.candidatesQueue[connectionId].push(_candidate); + } + }; + + flushCandidatesQueue (connectionId) { + if (this.audioEndpoints[connectionId]) { + try { + while(this.candidatesQueue[connectionId].length) { + let candidate = this.candidatesQueue[connectionId].shift(); + this.mcs.addIceCandidate(this.audioEndpoints[connectionId], candidate); + } + } + catch (err) { + Logger.error("[audio] ICE candidate could not be added to media controller.", err); + } + } + } + + mediaState (event) { + let msEvent = event.event; + + switch (event.eventTag) { + + case "MediaStateChanged": + break; + + default: Logger.warn("[audio] Unrecognized event"); + } + } + + mediaStateWebRtc (event, id) { + let msEvent = event.event; + + switch (event.eventTag) { + case "OnIceCandidate": + let candidate = msEvent.candidate; + Logger.debug('[audio] Received ICE candidate from mcs-core for media session', event.id, '=>', candidate); + + this.bbbGW.publish(JSON.stringify({ + connectionId: id, + id : 'iceCandidate', + type: 'audio', + cameraId: this._id, + candidate : candidate + }), C.FROM_AUDIO); + + break; + + case "MediaStateChanged": + break; + + case "MediaFlowOutStateChange": + Logger.info('[audio]', msEvent.type, '[' + msEvent.state? msEvent.state : 'UNKNOWN_STATE' + ']', 'for media session', event.id); + // TODO treat this accordingly =( (prlanzarin 05/02/2018) + + break; + + case "MediaFlowInStateChange": + break; + + default: Logger.warn("[audio] Unrecognized event", event); + } + } + + async start (sessionId, connectionId, sdpOffer, callerName, callback) { + Logger.info("[audio] Starting audio instance for", this.id); + let sdpAnswer; + + try { + if (!this.sourceAudioStarted) { + this.userId = await this.mcs.join(this.voiceBridge, 'SFU', {}); + Logger.info("[audio] MCS join for", this.id, "returned", this.userId); + + const ret = await this.mcs.publish(this.userId, + this.voiceBridge, + 'RtpEndpoint', + {descriptor: sdpOffer, adapter: 'Freeswitch', name: callerName}); + + this.sourceAudio = ret.sessionId; + this.mcs.on('MediaEvent' + this.sourceAudio, this.mediaState.bind(this)); + this.sourceAudioStarted = true; + + Logger.info("[audio] MCS publish for user", this.userId, "returned", this.sourceAudio); + } + + const retSubscribe = await this.mcs.subscribe(this.userId, + this.sourceAudio, + 'WebRtcEndpoint', + {descriptor: sdpOffer, adapter: 'Kurento'}); + + this.audioEndpoints[connectionId] = retSubscribe.sessionId; + + sdpAnswer = retSubscribe.answer; + this.flushCandidatesQueue(connectionId); + + this.mcs.on('MediaEvent' + retSubscribe.sessionId, (event) => { + this.mediaStateWebRtc(event, connectionId) + }); + + Logger.info("[audio] MCS subscribe for user", this.userId, "returned", retSubscribe.sessionId); + + return callback(null, sdpAnswer); + } + catch (err) { + Logger.error("[audio] MCS returned error => " + err); + return callback(err); + } + }; + + async stopListener(id) { + let listener = this.audioEndpoints[id]; + Logger.info('[audio] Releasing endpoints for', listener); + + if (listener) { + try { + if (this.audioEndpoints && Object.keys(this.audioEndpoints).length === 1) { + await this.mcs.leave(this.voiceBridge, this.userId); + this.sourceAudioStarted = false; + } + else { + await this.mcs.unsubscribe(this.userId, listener); + } + + delete this.candidatesQueue[id]; + delete this.audioEndpoints[id]; + + return; + } + catch (err) { + Logger.error('[audio] MCS returned error when trying to unsubscribe', err); + return; + } + } + } + + async stopAll () { + Logger.info('[audio] Releasing endpoints for user', this.userId, 'at room', this.voiceBridge); + + try { + await this.mcs.leave(this.voiceBridge, this.userId); + + for (var listener in this.audioEndpoints) { + delete this.audioEndpoints[listener]; + } + + for (var queue in this.candidatesQueue) { + delete this.candidatesQueue[queue]; + } + + this.sourceAudioStarted = false; + + Promise.resolve(); + } + catch (err) { + // TODO error handling + Promise.reject(); + } + return; + }; +}; diff --git a/labs/bbb-webrtc-sfu/lib/connection-manager/ConnectionManager.js b/labs/bbb-webrtc-sfu/lib/connection-manager/ConnectionManager.js index 7c435ad220de038dd6c39f57310da7ff8c482a0e..0f904178535d5ac6a4fa03975c78ba9093fdd960 100644 --- a/labs/bbb-webrtc-sfu/lib/connection-manager/ConnectionManager.js +++ b/labs/bbb-webrtc-sfu/lib/connection-manager/ConnectionManager.js @@ -81,6 +81,10 @@ module.exports = class ConnectionManager { this._emitter.emit('response', data); }); + audio.on(C.REDIS_MESSAGE, (data) => { + this._emitter.emit('response', data); + }); + Logger.info('[ConnectionManager] Successfully subscribed to processes redis channels'); } catch (err) { diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/AudioHandler.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/AudioHandler.js new file mode 100644 index 0000000000000000000000000000000000000000..e456e87412fec5b869a00e8bb742632a396221cb --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/AudioHandler.js @@ -0,0 +1,98 @@ +'use strict'; + +const Logger = require('../../../../utils/Logger'); +const config = require('config'); + +var kmh = function(sdp) { + this.endpointSdp = sdp; +}; +/** + * @classdesc + * Custom sipjs's MediaHandler to manage media communication between + * Kurento and Freeswitch + * @constructor + */ +kmh.prototype.AudioHandler = function (session, options ) { + this.session = session; + this.options = options; + this.Kurento; + this.sdp = null; + this.endpointSdp = null; + this.remote_sdp = null; + this.version = '0.0.1'; + + //Default video configuration + this.video = { + configuration: { + codecId: '96', + sendReceive: 'sendrecv', + rtpProfile: 'RTP/AVP', + codecName: 'H264' , + codecRate: '90000', + frameRate: '30.000000' + } + }; +}; + +/** + * Factory method for AudioHandler + * @param {Object} session Current session of this media handler + * @param {Object} options Options + * @return {AudioHandler} A AudioHandler + */ +kmh.prototype.AudioHandler.defaultFactory = function audioDefaultFactory(session, options) { + return new kmh.prototype.AudioHandler(session, options); +}; + +/** + * Setup method for this media handler. This method MUST be called before + * the SIP session starts. + * @param {Object} configuration Configuration parameters for the session + */ +kmh.prototype.AudioHandler.setup = function (sdp, rtp, kurento) { + kmh.prototype.AudioHandler.prototype.sendSdp = sdp; + kmh.prototype.AudioHandler.prototype.rtp = rtp; + kmh.prototype.AudioHandler.prototype.Kurento = kurento; + + Logger.info('[mcs-audio-handler] Setting SDP'); +}; + +kmh.prototype.AudioHandler.prototype = { + + isReady: function () { return true; }, + + close: function () { + if (this.timeout) { + clearTimeout(this.timeout); + delete this.timeout; + } + delete this.session; + }, + + render: function(){}, + mute: function(){}, + unmute: function(){}, + + getDescription: async function (onSuccess, onFailure, mediaHint) { + if(this.endpointSdp === null) { + Logger.info("[mcs-audio-handler] Processing SDP for Kurento RTP endpoint", this.rtp); + this.endpointSdp = await this.Kurento.processOffer(this.rtp, this.remote_sdp); + } + this.sdp = this.endpointSdp; + this.timeout = setTimeout(function () { + delete this.timeout; + onSuccess(this.sdp); + }.bind(this), 0); + }, + + setDescription: function (description, onSuccess, onFailure) { + Logger.debug(" [AudioHandler] Remote SDP: ", description); + this.remote_sdp = description; + this.timeout = setTimeout(function () { + delete this.timeout; + onSuccess(); + }.bind(this), 0); + } +}; + +module.exports = new kmh(); diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/freeswitch.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/freeswitch.js new file mode 100644 index 0000000000000000000000000000000000000000..f3a9e0b0fa21b71a451ccd4fcbef08e012d88813 --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/freeswitch/freeswitch.js @@ -0,0 +1,259 @@ +'use strict' + +const C = require('../../constants/Constants.js'); +const config = require('config'); +const EventEmitter = require('events').EventEmitter; +const audioHandler = require('./AudioHandler.js'); +const Logger = require('../../../../utils/Logger'); +const SIPJS = require('sip.js'); +const LOCAL_IP_ADDRESS = config.get('localIpAddress'); +const FREESWITCH_IP = config.get('freeswitch').ip; +const FREESWITCH_PORT = config.get('freeswitch').port; +const Kurento = require('../kurento'); +const isError = require('../../utils/util').isError; + +let instance = null; + +/* Public members */ +module.exports = class Freeswitch extends EventEmitter { + constructor(serverUri) { + if(!instance){ + super(); + this._serverUri = serverUri; + this._userAgents = {}; + this._sessions = {}; + this._rtpConverters = {}; + this._Kurento = new Kurento(config.get('kurentoUrl')); + instance = this; + } + + return instance; + } + + async init () { + Logger.debug("[mcs-media] freeswitch init stub"); + await this._Kurento.init(); + } + + + _getMediaServerClient (serverUri) { + return new Promise((resolve, reject) => { + }); + } + + async createMediaElement (roomId, type, params) { + if (this._userAgents[roomId]) { + return Promise.resolve(roomId); + } + try { + let userAgent = await this._createUserAgent(type, params.name, roomId); + this._userAgents[roomId] = userAgent; + return Promise.resolve(roomId); + } + catch (err) { + return Promise.reject(err); + } + } + + async connect (sourceId, sinkId, type) { + let source = this._sessions[sourceId]; + Logger.debug("[mcs-media-freeswitch] Connecting", this._rtpConverters[sourceId], "to", sinkId); + + if (source) { + return new Promise((resolve, reject) => { + switch (type) { + case 'ALL': + + case 'AUDIO': + + case 'VIDEO': + this._Kurento.connect(this._rtpConverters[sourceId], sinkId, type); + return resolve(); + break; + + default: return reject("[mcs-media] Invalid connect type"); + } + }); + } + else { + return Promise.reject("[mcs-media] Failed to connect " + type + ": " + sourceId + " to " + sinkId); + } + } + + stop (roomId, elementId) { + Logger.info("[mcs-media-freeswitch] Releasing endpoint", elementId, "from room", roomId); + let userAgent = this._userAgents[elementId]; + let rtpConverter = this._rtpConverters[elementId]; + + if (userAgent) { + Logger.info("[mcs-media-freeswitch] Stopping user agent", elementId); + userAgent.stop(); + delete this._userAgents[elementId]; + } + + if (rtpConverter) { + this._Kurento.stop(roomId, this._rtpConverters[elementId]); + delete this._rtpConverters[elementId]; + } + + return; + } + + async processOffer (elementId, sdpOffer, params) { + let userAgent = this._userAgents[elementId]; + let rtpEndpoint; + + return new Promise(async (resolve, reject) => { + try { + if (userAgent) { + + if (this._rtpConverters[elementId]) { + rtpEndpoint = this._rtpConverters[elementId]; + } + else { + rtpEndpoint = await this._Kurento.createMediaElement(elementId, 'RtpEndpoint'); + this._rtpConverters[elementId] = rtpEndpoint; + } + + Logger.info("[mcs-media-freeswitch] RTP endpoint equivalent to SIP instance is", rtpEndpoint); + + let session = this.sipCall(userAgent, + params.name, + elementId, + FREESWITCH_IP, + FREESWITCH_PORT, + rtpEndpoint + //sdpOffer + ); + + session.on('accepted', (response, cause) => { + this._sessions[elementId] = session; + return resolve(session.remote_sdp); + }); + + session.on('rejected', (response, cause) => { + Logger.info("session rejected", response, cause); + }); + + session.on('failed', (response, cause) => { + Logger.info("session failed", response, cause); + }); + + session.on('progress', (response) => { + Logger.info("session progress", response); + }); + + } else { + return reject("[mcs-media] There is no element " + elementId); + } + } + catch (error) { + this._handleError(error); + reject(error); + } + }); + } + + trackMediaState (elementId, type) { + let userAgent = this._userAgents[elementId]; + if (userAgent) { + userAgent.on('invite', function(session) { + Logger.info("[mcs-media-freeswitch] On UserAgentInvite"); + }); + + userAgent.on('message', function(message) { + Logger.info("[mcs-media-freeswitch] On UserAgentMessage", message); + }); + + userAgent.on('connected', function() { + Logger.info("[mcs-media-freeswitch] On UserAgentConnected"); + }); + + userAgent.on('disconnected', function (){ + Logger.warn("[mcs-media-freeswitch] UserAgent disconnected"); + }); + + return; + } + } + + _destroyElements() { + for (var ua in this._userAgents) { + if (this._userAgents.hasOwnProperty(ua)) { + delete this._mediaElements[ua]; + } + } + } + + _createUserAgent (type, displayName, roomId) { + var mediaFactory = audioHandler.AudioHandler.defaultFactory; + var newUA = new SIPJS.UA({ + uri: 'sip:' + C.FREESWITCH.GLOBAL_AUDIO_PREFIX + roomId + '@' + LOCAL_IP_ADDRESS, + wsServers: 'ws://' + FREESWITCH_IP + ':' + FREESWITCH_PORT, + displayName: displayName, + register: false, + mediaHandlerFactory: mediaFactory, + userAgentString: C.STRING.SIP_USER_AGENT, + log: { + builtinEnabled: false, + level: 3, + connector: this.sipjsLogConnector + }, + traceSip: true, + hackIpInContact: LOCAL_IP_ADDRESS + }); + + Logger.info("[mcs-freeswitch-adapter] Created new user agent for endpoint " + displayName); + + return newUA; + } + +/** + * Makes a sip call to a Freeswitch instance + * @param {UA} caller's SIP.js User Agent + * @param {String} username The user identifier (Kurento Endpoint ID) + * @param {String} voiceBridge The voiceBridge we are going to call to + * @param {String} host Freeswitch host address + * @param {String} port Freeswitch port + */ + sipCall (userAgent, username, voiceBridge, host, port, rtp) { + //call options + var options = { + media: { + constraints: { + audio: true, + video: false + }, + }, + inviteWithoutSdp: true, + params: { + from_displayName : username + } + }; + + audioHandler.AudioHandler.setup(null, rtp, this._Kurento); + + var sipUri = new SIPJS.URI('sip', voiceBridge, host, port); + + Logger.info('[mcs-media-freeswitch] Making SIP call to: ' + sipUri + ' from: ' + username); + + + + return userAgent.invite(sipUri, options); + } + + _handleError(error) { + // Checking if the error needs to be wrapped into a JS Error instance + if(!isError(error)) { + error = new Error(error); + } + + error.code = C.ERROR.MEDIA_SERVER_ERROR; + Logger.error('[mcs-media] Media Server returned error', error); + } + + + sipjsLogConnector (level, category, label, content) { + Logger.debug('[SIP.js] ' + content); + } +}; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento.js similarity index 98% rename from labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js rename to labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento.js index d70a7bbffed5b0663313873633af893b5d268f19..58d741bbe13335bf260176d6a8003f79e44d26b0 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/media-server.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento.js @@ -6,6 +6,7 @@ const mediaServerClient = require('kurento-client'); const util = require('util'); const EventEmitter = require('events').EventEmitter; const Logger = require('../../../utils/Logger'); +const isError = require('../utils/util').isError; let instance = null; @@ -333,10 +334,4 @@ module.exports = class MediaServer extends EventEmitter { error.code = C.ERROR.MEDIA_SERVER_ERROR; Logger.error('[mcs-media] Media Server returned error', error); } - - // duck - isError(error) { - return error && error.stack && error.message && typeof error.stack === 'string' - && typeof error.message === 'string'; - } }; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js index 3dc2693519ddb67ff00edd3453b81e812476356d..b2d3abc3db1a2dcc9012917677cfd485f05b7e24 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/constants/Constants.js @@ -54,6 +54,10 @@ exports.ERROR = {}; exports.ERROR.MEDIA_SERVER_OFFLINE = "1000"; exports.ERROR.MEDIA_SERVER_ERROR = "1001"; +// Freeswitch Adapter +exports.FREESWITCH = {}; +exports.FREESWITCH.GLOBAL_AUDIO_PREFIX = "GLOBAL_AUDIO_"; + // RTP params exports.SDP = {}; exports.SDP.PARAMS = "params" @@ -72,6 +76,11 @@ exports.SDP.FRAME_RATE = "frame_rate" // Strings exports.STRING = {} +exports.STRING.KURENTO = "Kurento" +exports.STRING.FREESWITCH = "Freeswitch" +exports.STRING.USER_AGENT = "MediaController" +exports.STRING.DEFAULT_NAME = "default" +exports.STRING.SIP_USER_AGENT = "SIP.js 0.7.8" exports.STRING.ANONYMOUS = "ANONYMOUS" exports.STRING.FS_USER_AGENT_STRING = "Freeswitch_User_Agent" exports.STRING.XML_MEDIA_FAST_UPDATE = '<?xml version=\"1.0\" encoding=\"utf-8\" ?>' + diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js index c5047b2cb852d455b7ffea0e20c5b6c604813bd6..0554112eeeb8606e2c244c4caf14c9a4959dccb6 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/media/MediaController.js @@ -76,12 +76,6 @@ module.exports = class MediaController { const user = await this.createUserMCS(roomId, type, params); room.setUser(user.id); this._users[user.id] = user; - if (params.sdp) { - session = user.addSdp(params.sdp); - } - if (params.uri) { - session = user.addUri(params.sdp); - } Logger.info("[mcs-controller] Resolving user " + user.id); return Promise.resolve(user.id); @@ -119,7 +113,7 @@ module.exports = class MediaController { } } - async publishnsubscribe (userId, sourceId, sdp, params) { + async publishnsubscribe (userId, sourceId, params) { Logger.info("[mcs-controller] PublishAndSubscribe from user", userId, "to source", sourceId); Logger.debug("[mcs-controler] PublishAndSubscribe descriptor is", params.descriptor); @@ -127,7 +121,7 @@ module.exports = class MediaController { try { user = this.getUserMCS(userId); let userId = user.id; - let session = user.addSdp(sdp, type); + let session = user.addSdp(params.descriptor, type, params.adapter, params.name); let sessionId = session.id; if (typeof this._mediaSessions[session.id] == 'undefined' || @@ -138,7 +132,7 @@ module.exports = class MediaController { this._mediaSessions[session.id] = session; const answer = await user.startSession(session.id); - await user.connect(sourceId, session.id); + await user.connect(sourceId, session); Logger.info("[mcs-controller] PublishAndSubscribe return a SDP session with ID", session.id); return Promise.resolve({userId, sessionId}); @@ -150,7 +144,7 @@ module.exports = class MediaController { } async publish (userId, roomId, type, params) { - Logger.info("[mcs-controller] Publish from user", userId, "to room", roomId); + Logger.info("[mcs-controller] Publish from user", userId, "to room", roomId, "with type", type); Logger.debug("[mcs-controler] Publish descriptor is", params.descriptor); let session; @@ -167,7 +161,7 @@ module.exports = class MediaController { switch (type) { case "RtpEndpoint": case "WebRtcEndpoint": - session = user.addSdp(params.descriptor, type); + session = user.addSdp(params.descriptor, type, params.adapter, params.name); session.on('SESSION_STOPPED', (pubId) => { Logger.info("[mcs-controller] Media session", session.id, "stopped"); if(pubId === session.id) { @@ -231,17 +225,17 @@ module.exports = class MediaController { switch (type) { case "RtpEndpoint": case "WebRtcEndpoint": - session = user.addSdp(params.descriptor, type); + session = user.addSdp(params.descriptor, type, params.adapter, params.name); answer = await user.startSession(session.id); - await sourceSession.connect(session._mediaElement); + await sourceSession.connect(session); sourceSession.subscribedSessions.push(session.id); Logger.info("[mcs-controller] Updated", sourceSession.id, "subscribers list to", sourceSession.subscribedSessions); break; case "URI": session = user.addUri(params.descriptor, type); answer = await user.startSession(session.id); - await sourceSession.connect(session._mediaElement); + await sourceSession.connect(session); break; @@ -339,7 +333,7 @@ module.exports = class MediaController { switch (type) { case C.USERS.SFU: - user = new SfuUser(roomId, type, this.emitter, params.userAgentString, params.sdp); + user = new SfuUser(roomId, type, this.emitter, params.userAgentString, params.descriptor); break; case C.USERS.MCU: Logger.info("[mcs-controller] createUserMCS MCU TODO"); diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js index 00b6d5ff37685778720ca08f888919f706a736ae..0aaa0383dd9645a012486ca7995261734a5897f7 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SdpSession.js @@ -9,27 +9,38 @@ const C = require('../constants/Constants'); const SdpWrapper = require('../utils/SdpWrapper'); const rid = require('readable-id'); const EventEmitter = require('events').EventEmitter; -const MediaServer = require('../media/media-server'); +const Kurento = require('../adapters/kurento'); +const Freeswitch = require('../adapters/freeswitch/freeswitch'); const config = require('config'); const kurentoUrl = config.get('kurentoUrl'); const Logger = require('../../../utils/Logger'); module.exports = class SdpSession extends EventEmitter { - constructor(emitter, sdp = null, room, type = 'WebRtcEndpoint') { + constructor( + emitter, + sdp = null, + room, + type = 'WebRtcEndpoint', + adapter = C.STRING.KURENTO, + name = C.STRING.DEFAULT_NAME + ) { + super(); this.id = rid(); this.room = room; this.emitter = emitter; this._status = C.STATUS.STOPPED; this._type = type; + this._name = name; // {SdpWrapper} SdpWrapper this._sdp; if (sdp && type) { this.setSdp(sdp, type); } - this._MediaServer = new MediaServer(kurentoUrl); this._mediaElement; this.subscribedSessions = []; + this._adapter = adapter; + this._MediaServer = SdpSession.getAdapter(adapter); } async setSdp (sdp, type) { @@ -39,12 +50,13 @@ module.exports = class SdpSession extends EventEmitter { async start (sdpId) { this._status = C.STATUS.STARTING; + Logger.info("[mcs-sdp-session] Starting new SDP session", this.id, "in room", this.room ); + try { const client = await this._MediaServer.init(); - - Logger.info("[mcs-sdp-session] Starting new SDP session", this.id, "in room", this.room ); - this._mediaElement = await this._MediaServer.createMediaElement(this.room, this._type); + this._mediaElement = await this._MediaServer.createMediaElement(this.room, this._type, {name: this._name}); this._MediaServer.trackMediaState(this._mediaElement, this._type); + this._MediaServer.on(C.EVENT.MEDIA_STATE.MEDIA_EVENT+this._mediaElement, (event) => { setTimeout(() => { event.id = this.id; @@ -52,12 +64,14 @@ module.exports = class SdpSession extends EventEmitter { }, 50); }); - const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp()); + const answer = await this._MediaServer.processOffer(this._mediaElement, this._sdp.getPlainSdp(), {name: this._name}); if (this._type === 'WebRtcEndpoint') { this._MediaServer.gatherCandidates(this._mediaElement); } + Logger.debug("[mcs-sdp-session] SDP session started with element ID", this._mediaElement); + return Promise.resolve(answer); } catch (err) { @@ -84,7 +98,8 @@ module.exports = class SdpSession extends EventEmitter { // TODO move to parent Session // TODO handle connection type - async connect (sinkId) { + async connect (sinkSession) { + let sinkId = sinkSession._mediaElement; try { Logger.info("[mcs-sdp-session] Connecting " + this._mediaElement + " => " + sinkId); await this._MediaServer.connect(this._mediaElement, sinkId, 'ALL'); @@ -110,6 +125,24 @@ module.exports = class SdpSession extends EventEmitter { this._MediaServer.addMediaEventListener (type, mediaId); } + static getAdapter (adapter) { + let obj = null; + + Logger.info("[SdpSession] Session is using the", adapter, "adapter"); + + switch (adapter) { + case C.STRING.KURENTO: + obj = new Kurento(kurentoUrl); + break; + case C.STRING.FREESWITCH: + obj = new Freeswitch(); + break; + default: Logger.warn("[SdpSession] Invalid adapter", this.adapter); + } + + return obj; + } + handleError (err) { Logger.error("[mcs-sdp-session] SFU SDP Session received an error", err); this._status = C.STATUS.STOPPED; diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js index 049eea9c41e6a3fe5a16cf6f4ed5b34bfe7cb9d4..600ee1e7d095d85292a37fd62d9dd4a15d11df22 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/SfuUser.js @@ -47,9 +47,9 @@ module.exports = class SfuUser extends User { } } - addSdp (sdp, type) { + addSdp (sdp, type, adapter = C.STRING.KURENTO, name = C.STRING.DEFAULT_NAME) { // TODO switch from type to children SdpSessions (WebRTC|SDP) - let session = new SdpSession(this.emitter, sdp, this.roomId, type); + let session = new SdpSession(this.emitter, sdp, this.roomId, type, adapter, name); this.emitter.emit(C.EVENT.NEW_SESSION+this.id, session.id); session.on("SESSION_STOPPED", (sessId) => { Logger.info("[mcs-sfu-user] Session ", sessId, "stopped, cleaning it..."); @@ -108,6 +108,7 @@ module.exports = class SfuUser extends User { async unsubscribe (mediaId) { try { + Logger.debug("[SfuUser] Unsubscribing from session", mediaId); await this.stopSession(mediaId); Promise.resolve(); } diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/UriSession.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/UriSession.js index de0af958e0a425573efee084fd362ae0ad4a8cdb..a492b07b91d0335e64227024a850b2a7b413af56 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/UriSession.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/model/UriSession.js @@ -8,7 +8,7 @@ const C = require('../constants/Constants'); const rid = require('readable-id'); const EventEmitter = require('events').EventEmitter; -const MediaServer = require('../media/media-server'); +const MediaServer = require('../adapters/kurento'); const Logger = require('../../../utils/Logger'); module.exports = class UriSession extends EventEmitter { diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js new file mode 100644 index 0000000000000000000000000000000000000000..e60c982613fb33d1b7c6cbd74d1239d5b79ced09 --- /dev/null +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/utils/util.js @@ -0,0 +1,11 @@ +/** + * @classdesc + * Utils class for mcs-core + * @constructor + */ + + +exports.isError = function (error) { + return error && error.stack && error.message && typeof error.stack === 'string' + && typeof error.message === 'string'; +} diff --git a/labs/bbb-webrtc-sfu/package-lock.json b/labs/bbb-webrtc-sfu/package-lock.json index cf514a459495d06898efe2924d2ba9a9883f5e4c..a5b9644a6d00ab07f8d94330ca3414adcf9880b6 100644 --- a/labs/bbb-webrtc-sfu/package-lock.json +++ b/labs/bbb-webrtc-sfu/package-lock.json @@ -1,6 +1,6 @@ { "name": "bbb-webrtc-sfu", - "version": "0.0.3", + "version": "0.0.4", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -250,6 +250,12 @@ "resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz", "integrity": "sha1-/7xJiDNuDoM94MFox+8VISGqf7M=" }, + "promiscuous": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/promiscuous/-/promiscuous-0.6.0.tgz", + "integrity": "sha1-VAFM09Ysr+gx4zVJkMBf9beMiJI=", + "optional": true + }, "promise": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/promise/-/promise-7.1.1.tgz", @@ -306,6 +312,32 @@ "resolved": "https://registry.npmjs.org/sdp-transform/-/sdp-transform-2.3.0.tgz", "integrity": "sha1-V6lXWUIEHYV3qGnXx01MOgvYiPY=" }, + "sip.js": { + "version": "0.7.5", + "resolved": "https://registry.npmjs.org/sip.js/-/sip.js-0.7.5.tgz", + "integrity": "sha1-hqznBRWU+RtFUb24EgoWxEli06I=", + "requires": { + "promiscuous": "0.6.0", + "ws": "0.6.5" + }, + "dependencies": { + "nan": { + "version": "1.4.3", + "resolved": "https://registry.npmjs.org/nan/-/nan-1.4.3.tgz", + "integrity": "sha1-xWtUBGmAY2lvWXQ1+RY8MSrqUAk=" + }, + "ws": { + "version": "0.6.5", + "resolved": "https://registry.npmjs.org/ws/-/ws-0.6.5.tgz", + "integrity": "sha1-8AhEAByjk7ADaB/zKDjnKlYNr9Q=", + "requires": { + "nan": "1.4.3", + "options": "0.0.6", + "ultron": "1.0.2" + } + } + } + }, "sprintf-js": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/sprintf-js/-/sprintf-js-1.0.3.tgz", diff --git a/labs/bbb-webrtc-sfu/package.json b/labs/bbb-webrtc-sfu/package.json index 3f4894a2a4073798e3ceb581ea3d04aa3bfda6c7..1d8092e1e733022712770ff43aac1e5446174209 100644 --- a/labs/bbb-webrtc-sfu/package.json +++ b/labs/bbb-webrtc-sfu/package.json @@ -1,6 +1,6 @@ { "name": "bbb-webrtc-sfu", - "version": "0.0.3", + "version": "0.0.4", "private": true, "scripts": { "start": "node server.js" @@ -15,7 +15,8 @@ "ws": "^3.3.2", "config": "^1.26.1", "js-yaml": "^3.8.3", - "winston": "^2.4.0" + "winston": "^2.4.0", + "sip.js": "0.7.5" }, "optionalDependencies": {} }