diff --git a/labs/node-bbb-apps/README.md b/labs/node-bbb-apps/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/labs/node-bbb-apps/debug-start.sh b/labs/node-bbb-apps/debug-start.sh new file mode 100644 index 0000000000000000000000000000000000000000..b7c6990b40943deef6c8529bfec832f0d929927b --- /dev/null +++ b/labs/node-bbb-apps/debug-start.sh @@ -0,0 +1 @@ +node --inspect --debug-brk server.js diff --git a/labs/node-bbb-apps/keys/README.md b/labs/node-bbb-apps/keys/README.md new file mode 100644 index 0000000000000000000000000000000000000000..5bc681a1c8d2ece88651b6ee63d410536eae50f6 --- /dev/null +++ b/labs/node-bbb-apps/keys/README.md @@ -0,0 +1,2 @@ +This folder contains a dummy self-signed certificate only for demo purposses, +**DON'T USE IT IN PRODUCTION**. diff --git a/labs/node-bbb-apps/lib/bbb/messages/Constants.js b/labs/node-bbb-apps/lib/bbb/messages/Constants.js new file mode 100644 index 0000000000000000000000000000000000000000..a72ae88639b8c9f868120d830ad7a49d5aac452e --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/Constants.js @@ -0,0 +1,99 @@ +"use strict"; +/** + * @classdesc + * Message constants for the communication with BigBlueButton + * @constructor + */ + function Constants () { + return { + // Media elements + WebRTC: "WebRtcEndpoint", + RTP: "RtpEndpoint", + AUDIO: "AUDIO", + VIDEO: "VIDEO", + ALL: "ALL", + + // Redis channels + FROM_BBB_TRANSCODE_SYSTEM_CHAN : "bigbluebutton:from-bbb-transcode:system", + FROM_VOICE_CONF_SYSTEM_CHAN: "from-voice-conf-redis-channel", + TO_BBB_TRANSCODE_SYSTEM_CHAN: "bigbluebutton:to-bbb-transcode:system", + FROM_SCREENSHARE: "from-screenshare-redis-channel", + TO_SCREENSHARE: "to-screenshare-redis-channel", + FROM_VIDEO: "from-video-redis-channel", + TO_VIDEO: "to-video-redis-channel", + FROM_AUDIO: "from-audio-redis-channel", + TO_AUDIO: "to-audio-redis-channel", + + // RedisWrapper events + REDIS_MESSAGE : "redis_message", + WEBSOCKET_MESAGE: "ws_message", + GATEWAY_MESSAGE: "gateway_message", + + // Message identifiers 1x + START_TRANSCODER_REQUEST: "start_transcoder_request_message", + START_TRANSCODER_REPLY: "start_transcoder_reply_message", + STOP_TRANSCODER_REQUEST: "stop_transcoder_request_message", + STOP_TRANSCODER_REPLY: "stop_transcoder_reply_message", + DESKSHARE_RTMP_BROADCAST_STARTED: "deskshare_rtmp_broadcast_started_message", + DESKSHARE_RTMP_BROADCAST_STOPPED: "deskshare_rtmp_broadcast_stopped_message", + + //Message identifiers 2x + SCREENSHARE_RTMP_BROADCAST_STARTED_2x: "ScreenshareRtmpBroadcastStartedVoiceConfEvtMsg", + SCREENSHARE_RTMP_BROADCAST_STOPPED_2x: "ScreenshareRtmpBroadcastStoppedVoiceConfEvtMsg", + START_TRANSCODER_REQ_2x: "StartTranscoderSysReqMsg", + START_TRANSCODER_RESP_2x: "StartTranscoderSysRespMsg", + STOP_TRANSCODER_REQ_2x: "StopTranscoderSysReqMsg", + STOP_TRANSCODER_RESP_2x: "StopTranscoderSysRespMsg", + + // Redis messages fields + // Transcoder 1x + USER_ID : "user_id", + OPTIONS: "options", + VOICE_CONF_ID : "voice_conf_id", + TRANSCODER_ID : "transcoder_id", + + // Transcoder 2x + USER_ID_2x : "userId", + TRANSCODER_ID_2x : "transcoderId", + MEETING_ID_2x: "meetingId", + + // Screenshare 2x + CONFERENCE_NAME: "voiceConf", + SCREENSHARE_CONF: "screenshareConf", + STREAM_URL: "stream", + TIMESTAMP: "timestamp", + VIDEO_WIDTH: "vidWidth", + VIDEO_HEIGHT: "vidHeight", + + // RTP params + MEETING_ID : "meeting_id", + VOICE_CONF : "voice_conf", + KURENTO_ENDPOINT_ID : "kurento_endpoint_id", + PARAMS : "params", + MEDIA_DESCRIPTION: "media_description", + LOCAL_IP_ADDRESS: "local_ip_address", + LOCAL_VIDEO_PORT: "local_video_port", + DESTINATION_IP_ADDRESS : "destination_ip_address", + DESTINATION_VIDEO_PORT : "destination_video_port", + REMOTE_VIDEO_PORT : "remote_video_port", + CODEC_NAME: "codec_name", + CODEC_ID: "codec_id", + CODEC_RATE: "codec_rate", + RTP_PROFILE: "rtp_profile", + SEND_RECEIVE: "send_receive", + FRAME_RATE: "frame_rate", + INPUT: "input", + KURENTO_TOKEN : "kurento_token", + SCREENSHARE: "deskShare", + STREAM_TYPE: "stream_type", + STREAM_TYPE_SCREENSHARE: "stream_type_deskshare", + STREAM_TYPE_VIDEO: "stream_type_video", + RTP_TO_RTMP: "transcode_rtp_to_rtmp", + TRANSCODER_CODEC: "codec", + TRANSCODER_TYPE: "transcoder_type", + CALLERNAME: "callername" + } +} + +module.exports = Constants(); + diff --git a/labs/node-bbb-apps/lib/bbb/messages/Messaging.js b/labs/node-bbb-apps/lib/bbb/messages/Messaging.js new file mode 100644 index 0000000000000000000000000000000000000000..a352acf1e5f8cfd736e48011316209995bf2a87d --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/Messaging.js @@ -0,0 +1,68 @@ +const Constants = require('./Constants.js'); + +// Messages + +let OutMessage = require('./OutMessage.js'); + +let StartTranscoderRequestMessage = + require('./transcode/StartTranscoderRequestMessage.js')(Constants); +let StopTranscoderRequestMessage = + require('./transcode/StopTranscoderRequestMessage.js')(Constants); +let StartTranscoderSysReqMsg = + require('./transcode/StartTranscoderSysReqMsg.js')(); +let StopTranscoderSysReqMsg = + require('./transcode/StopTranscoderSysReqMsg.js')(); +let DeskShareRTMPBroadcastStartedEventMessage = + require('./screenshare/DeskShareRTMPBroadcastStartedEventMessage.js')(Constants); +let DeskShareRTMPBroadcastStoppedEventMessage = + require('./screenshare/DeskShareRTMPBroadcastStoppedEventMessage.js')(Constants); +let ScreenshareRTMPBroadcastStartedEventMessage2x = + require('./screenshare/ScreenshareRTMPBroadcastStartedEventMessage2x.js')(Constants); +let ScreenshareRTMPBroadcastStoppedEventMessage2x = + require('./screenshare/ScreenshareRTMPBroadcastStoppedEventMessage2x.js')(Constants); + + + /** + * @classdesc + * Messaging utils to assemble JSON/Redis BigBlueButton messages + * @constructor + */ +function Messaging() {} + +Messaging.prototype.generateStartTranscoderRequestMessage = + function(meetingId, transcoderId, params) { + let statrm = new StartTranscoderSysReqMsg(meetingId, transcoderId, params); + return statrm.toJson(); +} + +Messaging.prototype.generateStopTranscoderRequestMessage = + function(meetingId, transcoderId) { + let stotrm = new StopTranscoderSysReqMsg(meetingId, transcoderId); + return stotrm.toJson(); +} + +Messaging.prototype.generateDeskShareRTMPBroadcastStartedEvent = + function(conferenceName, streamUrl, vw, vh, timestamp) { + let stadrbem = new DeskShareRTMPBroadcastStartedEventMessage(conferenceName, streamUrl, vw, vh, timestamp); + return stadrbem.toJson(); +} + +Messaging.prototype.generateDeskShareRTMPBroadcastStoppedEvent = + function(conferenceName, streamUrl, vw, vh, timestamp) { + let stodrbem = new DeskShareRTMPBroadcastStoppedEventMessage(conferenceName, streamUrl, vw, vh, timestamp); + return stodrbem.toJson(); +} + +Messaging.prototype.generateScreenshareRTMPBroadcastStartedEvent2x = + function(conferenceName, screenshareConf, streamUrl, vw, vh, timestamp) { + let stadrbem = new ScreenshareRTMPBroadcastStartedEventMessage2x(conferenceName, screenshareConf, streamUrl, vw, vh, timestamp); + return stadrbem.toJson(); +} + +Messaging.prototype.generateScreenshareRTMPBroadcastStoppedEvent2x = + function(conferenceName, screenshareConf, streamUrl, vw, vh, timestamp) { + let stodrbem = new ScreenshareRTMPBroadcastStoppedEventMessage2x(conferenceName, screenshareConf, streamUrl, vw, vh, timestamp); + return stodrbem.toJson(); +} + +module.exports = new Messaging(); diff --git a/labs/node-bbb-apps/lib/bbb/messages/OutMessage.js b/labs/node-bbb-apps/lib/bbb/messages/OutMessage.js new file mode 100644 index 0000000000000000000000000000000000000000..04776dca76e6513f20259b2a47c339f2c175c5e3 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/OutMessage.js @@ -0,0 +1,35 @@ +/* + * (C) Copyright 2016 Mconf Tecnologia (http://mconf.com/) + */ + +/** + * @classdesc + * Base class for output messages sent to BBB + * @constructor + */ +function OutMessage(messageName) { + /** + * The header template of the message + * @type {Object} + */ + this.header = { + version: "0.0.1", + name: messageName + }; + + /** + * The payload of the message + * @type {Object} + */ + this.payload = null; + + /** + * Generates the JSON representation of the message + * @return {String} The JSON string of this message + */ + this.toJson = function () { + return JSON.stringify(this); + } +}; + +module.exports = OutMessage; diff --git a/labs/node-bbb-apps/lib/bbb/messages/OutMessage2x.js b/labs/node-bbb-apps/lib/bbb/messages/OutMessage2x.js new file mode 100644 index 0000000000000000000000000000000000000000..a6f9cafcfcd289d2ef95334743fb5c32f63bd8f6 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/OutMessage2x.js @@ -0,0 +1,52 @@ +/* + * (C) Copyright 2016 Mconf Tecnologia (http://mconf.com/) + */ + +/** + * @classdesc + * Base class for output messages sent to BBB + * 2x model + * @constructor + */ +function OutMessage2x(messageName, routing, headerFields) { + + + this.envelope = { + name: messageName, + routing: routing + } + /** + * The header template of the message + * @type {Object} + */ + this.core = { + header : { + name: messageName + } + } + + // Copy header fiels to the header object + var keys1 = Object.keys(headerFields); + for (var k=0; k < keys1.length; k++) { + var key = keys1[k]; + if (typeof this.core.header[key] === 'undefined') { + this.core.header[key] = headerFields[key]; + } + } + + /** + * The body of the message + * @type {Object} + */ + this.core.body = null; + + /** + * Generates the JSON representation of the message + * @return {String} The JSON string of this message + */ + this.toJson = function () { + return JSON.stringify(this); + } +}; + +module.exports = OutMessage2x; diff --git a/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStartedEventMessage.js b/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStartedEventMessage.js new file mode 100644 index 0000000000000000000000000000000000000000..34579de0bfa0854f7170d1e92efc7cb818b2cbee --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStartedEventMessage.js @@ -0,0 +1,22 @@ +/* + * + */ + +var inherits = require('inherits'); +var OutMessage = require('../OutMessage'); + +module.exports = function (Constants) { + function DeskShareRTMPBroadcastStartedEventMessage (conferenceName, streamUrl, vw, vh, timestamp) { + DeskShareRTMPBroadcastStartedEventMessage.super_.call(this, Constants.DESKSHARE_RTMP_BROADCAST_STARTED); + + this.payload = {}; + this.payload[Constants.CONFERENCE_NAME] = conferenceName; + this.payload[Constants.STREAM_URL] = streamUrl; + this.payload[Constants.TIMESTAMP] = timestamp; + this.payload[Constants.VIDEO_WIDTH] = vw; + this.payload[Constants.VIDEO_HEIGHT] = vh; + }; + + inherits(DeskShareRTMPBroadcastStartedEventMessage, OutMessage); + return DeskShareRTMPBroadcastStartedEventMessage; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStoppedEventMessage.js b/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStoppedEventMessage.js new file mode 100644 index 0000000000000000000000000000000000000000..81e7125db7f9d7b61580f4dad93076387b6c8426 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/screenshare/DeskShareRTMPBroadcastStoppedEventMessage.js @@ -0,0 +1,22 @@ +/* + * + */ + +var inherits = require('inherits'); +var OutMessage = require('../OutMessage'); + +module.exports = function (Constants) { + function DeskShareRTMPBroadcastStoppedEventMessage (conferenceName, streamUrl, vw, vh, timestamp) { + DeskShareRTMPBroadcastStoppedEventMessage.super_.call(this, Constants.DESKSHARE_RTMP_BROADCAST_STOPPED); + + this.payload = {}; + this.payload[Constants.CONFERENCE_NAME] = conferenceName; + this.payload[Constants.STREAM_URL] = streamUrl; + this.payload[Constants.TIMESTAMP] = timestamp; + this.payload[Constants.VIDEO_WIDTH] = vw; + this.payload[Constants.VIDEO_HEIGHT] = vh; + }; + + inherits(DeskShareRTMPBroadcastStoppedEventMessage, OutMessage); + return DeskShareRTMPBroadcastStoppedEventMessage; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStartedEventMessage2x.js b/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStartedEventMessage2x.js new file mode 100644 index 0000000000000000000000000000000000000000..28867a2265f3a1d6393ebadc4936cc785e0be651 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStartedEventMessage2x.js @@ -0,0 +1,25 @@ +/* + * + */ + +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); + +module.exports = function (C) { + function ScreenshareRTMPBroadcastStartedEventMessage2x (conferenceName, screenshareConf, + streamUrl, vw, vh, timestamp) { + ScreenshareRTMPBroadcastStartedEventMessage2x.super_.call(this, C.SCREENSHARE_RTMP_BROADCAST_STARTED_2x, + {voiceConf: conferenceName}, {voiceConf: conferenceName}); + + this.core.body = {}; + this.core.body[C.CONFERENCE_NAME] = conferenceName; + this.core.body[C.SCREENSHARE_CONF] = screenshareConf; + this.core.body[C.STREAM_URL] = streamUrl; + this.core.body[C.VIDEO_WIDTH] = vw; + this.core.body[C.VIDEO_HEIGHT] = vh; + this.core.body[C.TIMESTAMP] = timestamp; + }; + + inherits(ScreenshareRTMPBroadcastStartedEventMessage2x, OutMessage2x); + return ScreenshareRTMPBroadcastStartedEventMessage2x; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStoppedEventMessage2x.js b/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStoppedEventMessage2x.js new file mode 100644 index 0000000000000000000000000000000000000000..d02dfb2cd3a335518cf5a207880cbb4b17e8cf64 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/screenshare/ScreenshareRTMPBroadcastStoppedEventMessage2x.js @@ -0,0 +1,25 @@ +/* + * + */ + +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); + +module.exports = function (C) { + function ScreenshareRTMPBroadcastStoppedEventMessage2x (conferenceName, screenshareConf, + streamUrl, vw, vh, timestamp) { + ScreenshareRTMPBroadcastStoppedEventMessage2x.super_.call(this, C.SCREENSHARE_RTMP_BROADCAST_STOPPED_2x, + {voiceConf: conferenceName}, {voiceConf: conferenceName}); + + this.core.body = {}; + this.core.body[C.CONFERENCE_NAME] = conferenceName; + this.core.body[C.SCREENSHARE_CONF] = screenshareConf; + this.core.body[C.STREAM_URL] = streamUrl; + this.core.body[C.VIDEO_WIDTH] = vw; + this.core.body[C.VIDEO_HEIGHT] = vh; + this.core.body[C.TIMESTAMP] = timestamp; + }; + + inherits(ScreenshareRTMPBroadcastStoppedEventMessage2x, OutMessage2x); + return ScreenshareRTMPBroadcastStoppedEventMessage2x; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderRequestMessage.js b/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderRequestMessage.js new file mode 100644 index 0000000000000000000000000000000000000000..69d5f0890f7c38d94a62e99e41cd7672ba8269fe --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderRequestMessage.js @@ -0,0 +1,20 @@ +/* + * + */ + +var inherits = require('inherits'); +var OutMessage = require('../OutMessage'); + +module.exports = function (Constants) { + function StartTranscoderRequestMessage (meetingId, transcoderId, params) { + StartTranscoderRequestMessage.super_.call(this, Constants.START_TRANSCODER_REQUEST); + + this.payload = {}; + this.payload[Constants.MEETING_ID] = meetingId; + this.payload[Constants.TRANSCODER_ID] = transcoderId; + this.payload[Constants.PARAMS] = params; + }; + + inherits(StartTranscoderRequestMessage, OutMessage); + return StartTranscoderRequestMessage; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js b/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js new file mode 100644 index 0000000000000000000000000000000000000000..bd517a14905d51effe3abdc28987d3bca2b91a82 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/transcode/StartTranscoderSysReqMsg.js @@ -0,0 +1,18 @@ +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); +var C = require('../Constants'); + +module.exports = function() { + function StartTranscoderSysReqMsg(meetingId, transcoderId, params) { + StartTranscoderSysReqMsg.super_.call(this, C.START_TRANSCODER_REQ_2x, + {sender: "kurento-screenshare"}, + {meetingId: meetingId}); + + this.core.body = {}; + this.core.body[C.TRANSCODER_ID_2x] = transcoderId; + this.core.body[C.PARAMS] = params; + }; + + inherits(StartTranscoderSysReqMsg, OutMessage2x); + return StartTranscoderSysReqMsg; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderRequestMessage.js b/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderRequestMessage.js new file mode 100644 index 0000000000000000000000000000000000000000..ad030d2c623252d385c8298bdf79c112caf12623 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderRequestMessage.js @@ -0,0 +1,15 @@ +var inherits = require('inherits'); +var OutMessage = require('../OutMessage'); + +module.exports = function (Constants) { + function StopTranscoderRequestMessage (meetingId, transcoderId) { + StopTranscoderRequestMessage.super_.call(this, Constants.STOP_TRANSCODER_REQUEST); + + this.payload = {}; + this.payload[Constants.MEETING_ID] = meetingId; + this.payload[Constants.TRANSCODER_ID] = transcoderId; + }; + + inherits(StopTranscoderRequestMessage, OutMessage); + return StopTranscoderRequestMessage; +} diff --git a/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js b/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js new file mode 100644 index 0000000000000000000000000000000000000000..639b415792b643ff20bfa47ad523bf4e72f27d5a --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/messages/transcode/StopTranscoderSysReqMsg.js @@ -0,0 +1,17 @@ +var inherits = require('inherits'); +var OutMessage2x = require('../OutMessage2x'); +var C = require('../Constants'); + +module.exports = function() { + function StopTranscoderSysReqMsg(meetingId, transcoderId) { + StopTranscoderSysReqMsg.super_.call(this, C.STOP_TRANSCODER_REQ_2x, + {sender: "kurento-screenshare"}, + {meetingId: meetingId}); + + this.core.body = {}; + this.core.body[C.TRANSCODER_ID_2x] = transcoderId; + }; + + inherits(StopTranscoderSysReqMsg, OutMessage2x); + return StopTranscoderSysReqMsg; +} diff --git a/labs/node-bbb-apps/lib/bbb/pubsub/RedisWrapper.js b/labs/node-bbb-apps/lib/bbb/pubsub/RedisWrapper.js new file mode 100644 index 0000000000000000000000000000000000000000..1f2a3fe2134440e14509f4a78d908c5563067f63 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/pubsub/RedisWrapper.js @@ -0,0 +1,119 @@ +/** + * @classdesc + * Redis wrapper class for connecting to Redis channels + */ + +'use strict'; + +/* Modules */ + +const redis = require('redis'); +const config = require('config'); +const Constants = require('../messages/Constants.js'); +const EventEmitter = require('events').EventEmitter; + +/* Public members */ + +module.exports = class RedisWrapper extends EventEmitter { + constructor(subpattern) { + super(); + // Redis PubSub client holders + this.redisCli = null; + this.redisPub = null; + // Pub and Sub channels/patterns + this.subpattern = subpattern; + } + + static get _retryThreshold() { + return 1000 * 60 * 60; + } + + static get _maxRetries() { + return 10; + } + + startPublisher () { + var options = { + host : config.get('redisHost'), + port : config.get('redisPort'), + //password: config.get('redis.password') + retry_strategy: this._redisRetry + }; + + this.redisPub = redis.createClient(options); + } + + startSubscriber () { + let self = this; + if (this.redisCli) { + console.log(" [RedisWrapper] Redis Client already exists"); + return; + } + + var options = { + host : config.get('redisHost'), + port : config.get('redisPort'), + //password: config.get('redis.password') + retry_strategy: this._redisRetry + }; + + this.redisCli = redis.createClient(options); + + console.log(" [RedisWrapper] Trying to subscribe to redis channel"); + + this.redisCli.on("psubscribe", (channel, count) => { + console.log(" [RedisWrapper] Successfully subscribed to pattern [" + channel + "]"); + }); + + this.redisCli.on("pmessage", this._onMessage.bind(this)); + + if (!this.subpattern) { + throw new Error("[RedisWrapper] No subscriber pattern"); + } + + this.redisCli.psubscribe(this.subpattern); + + console.log(" [RedisWrapper] Started Redis client at " + options.host + ":" + options.port + + " for subscription pattern: " + this.subpattern); + + return ; + } + + stopRedis (callback) { + if (this.redisCli){ + this.redisCli.quit(); + } + callback(false); + } + + publishToChannel (_message, channel) { + let message = _message; + if(this.redisPub) { + console.log(" [RedisWrapper] Sending message to channel [" + channel + "] : " + message ); + console.log(message); + this.redisPub.publish(channel, message); + } + } + + /* Private members */ + + _onMessage (pattern, channel, _message) { + let message = (typeof _message !== 'object')?JSON.parse(_message):_message; + console.log(" [RedisWrapper] Message received from channel [" + channel + "] : " + message); + // use event emitter to throw new message + this.emit(Constants.REDIS_MESSAGE, message); + } + + static _redisRetry (options) { + if (options.error && options.error.code === 'ECONNREFUSED') { + return new Error('The server refused the connection'); + } + if (options.total_retry_time > RedisWrapper._retryThreshold) { + return new Error('Retry time exhausted'); + } + if (options.times_connected > RedisWrapper._maxRetries) { + return undefined; + } + return Math.max(options.attempt * 100, 3000); + } +} diff --git a/labs/node-bbb-apps/lib/bbb/pubsub/bbb-gw.js b/labs/node-bbb-apps/lib/bbb/pubsub/bbb-gw.js new file mode 100644 index 0000000000000000000000000000000000000000..2e606a935949a41ecf1e0c0278dd6e9cea9ffc07 --- /dev/null +++ b/labs/node-bbb-apps/lib/bbb/pubsub/bbb-gw.js @@ -0,0 +1,124 @@ +/** + * @classdesc + * BigBlueButton redis gateway for bbb-screenshare node app + */ + +'use strict'; + +/* Modules */ + +const C = require('../messages/Constants.js'); +const RedisWrapper = require('./RedisWrapper.js'); +const config = require('config'); +const util = require('util'); +const EventEmitter = require('events').EventEmitter; + +let instance = null; + +module.exports = class BigBlueButtonGW { + constructor(emitter) { + if(!instance){ + this.emitter = emitter; + this.subscribers = {}; + this.publisher = null; + instance = this; + } + + return instance; + } + + addSubscribeChannel (channel) { + if (this.subscribers[channel]) { + return this.subscribers[channel]; + } + + let wrobj = new RedisWrapper(channel); + this.subscribers[channel] = {}; + this.subscribers[channel] = wrobj; + try { + wrobj.startSubscriber(); + wrobj.on(C.REDIS_MESSAGE, this.incomingMessage); + console.log(" [BigBlueButtonGW] Added redis client to this.subscribers[" + channel + "]"); + return Promise.resolve(wrobj); + } + catch (error) { + return Promise.reject(" [BigBlueButtonGW] Could not start redis client for channel " + channel); + } + } + + /** + * Capture messages from subscribed channels and emit an event with it's + * identifier and payload. Check Constants.js for the identifiers. + * + * @param {Object} message Redis message + */ + incomingMessage (message) { + let header; + let payload; + let msg = (typeof message !== 'object')?JSON.parse(message):message; + + // Trying to parse both message types, 1x and 2x + if (msg.header) { + header = msg.header; + payload = msg.payload; + } + else if (msg.core) { + header = msg.core.header; + payload = msg.core.body; + } + + if (header){ + switch (header.name) { + // interoperability with 1.1 + case C.START_TRANSCODER_REPLY: + this.emit(C.START_TRANSCODER_REPLY, payload); + break; + case C.STOP_TRANSCODER_REPLY: + this.emit(C.STOP_TRANSCODER_REPLY, payload); + break; + // 2x messages + case C.START_TRANSCODER_RESP_2x: + payload[C.MEETING_ID_2x] = header[C.MEETING_ID_2x]; + + this.emit(C.START_TRANSCODER_RESP_2x, payload); + break; + case C.STOP_TRANSCODER_RESP_2x: + payload[C.MEETING_ID_2x] = header[C.MEETING_ID_2x]; + this.emit(C.STOP_TRANSCODER_RESP_2x, payload); + break; + + default: + console.log(" [BigBlueButtonGW] Unknown Redis message with ID =>" + header.name); + this.emit(C.GATEWAY_MESSAGE, msg); + } + } + else { + console.log(" [BigBlueButtonGW] Unknown Redis message =>"); + this.emit(C.GATEWAY_MESSAGE, msg); + } + } + + publish (message, channel) { + if (!this.publisher) { + this.publisher = new RedisWrapper(); + this.publisher.startPublisher(); + } + + if (typeof this.publisher.publishToChannel === 'function') { + this.publisher.publishToChannel(message, channel); + } + } + + setEventEmitter (emitter) { + this.emitter = emitter; + } + + _onServerResponse(data) { + + console.log(' [WS] Receiving event '); + console.log(data); + + // Here this is the 'ws' instance + this.sendMessage(data); + } +} diff --git a/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js b/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js new file mode 100644 index 0000000000000000000000000000000000000000..886c5eb301493dbee54428cbddb98394a65b1a40 --- /dev/null +++ b/labs/node-bbb-apps/lib/connection-manager/ConnectionManager.js @@ -0,0 +1,87 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +'use strict'; + +// const express = require('express'); +// const session = require('express-session') +// const wsModule = require('./websocket'); + +const http = require('http'); +const fs = require('fs'); +const EventEmitter = require('events'); +const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw'); +const C = require('../bbb/messages/Constants'); + +// Global variables +module.exports = class ConnectionManager { + + constructor (settings, logger) { + this._logger = logger; + this._screenshareSessions = {}; + + this._emitter = this._setupEventEmitter(); + this._adapters = []; + + this._setupBBB(); + } + + setHttpServer(httpServer) { + this.httpServer = httpServer; + } + + listen(callback) { + this.httpServer.listen(callback); + } + + addAdapter(adapter) { + adapter.setEventEmitter(this._emitter); + this._adapters.push(adapter); + } + + _setupEventEmitter() { + let self = this; + let emitter = new EventEmitter(); + + emitter.on(C.WEBSOCKET_MESSAGE, (data) => { + console.log(" [ConnectionManager] RECEIVED DATA FROM WEBSOCKET"); + self._bbbGW.publish(JSON.stringify(data), C.TO_SCREENSHARE); + }); + + emitter.on(C.REDIS_MESSAGE, (data) => { + console.log(" [ConnectionManager] RECEIVED DATA FROM REDIS"); + emitter.emit('response', data); + }); + + return emitter; + } + + _setupBBB() { + this._bbbGW = new BigBlueButtonGW(this._emitter); + + try { + const transcode = this._bbbGW.addSubscribeChannel(C.FROM_BBB_TRANSCODE_SYSTEM_CHAN); + const screenshare = this._bbbGW.addSubscribeChannel(C.FROM_SCREENSHARE); + const video = this._bbbGW.addSubscribeChannel(C.FROM_VIDEO); + const audio = this._bbbGW.addSubscribeChannel(C.FROM_AUDIO); + + console.log(' [ConnectionManager] Successfully subscribed to processes redis channels'); + } + catch (err) { + console.log(' [ConnectionManager] ' + err); + this._stopAll; + } + } + + _stopSession(sessionId) { + } + + _stopAll() { + } +} + +module.exports.events = ['startWebRtc', '']; diff --git a/labs/node-bbb-apps/lib/connection-manager/HttpServer.js b/labs/node-bbb-apps/lib/connection-manager/HttpServer.js new file mode 100644 index 0000000000000000000000000000000000000000..bd6dcd42f1eef90a0dd5ae1cb7468dd14f6ddcd7 --- /dev/null +++ b/labs/node-bbb-apps/lib/connection-manager/HttpServer.js @@ -0,0 +1,29 @@ +"use strict"; + +const http = require("http"); +const fs = require("fs"); + +module.exports = class HttpServer { + + constructor() { + //const privateKey = fs.readFileSync('sslcert/server.key', 'utf8'); + //const certificate = fs.readFileSync('sslcert/server.crt', 'utf8'); + //const credentials = {key: privateKey, cert: certificate}; + + this.port = 3008; + + this.server = http.createServer((req,res) => { + // + }); + } + + getServerObject() { + return this.server; + } + + listen(callback) { + console.log(' [HttpServer] Listening in port ' + this.port); + this.server.listen(this.port, callback); + } + +} diff --git a/labs/node-bbb-apps/lib/connection-manager/MessageValidator.js b/labs/node-bbb-apps/lib/connection-manager/MessageValidator.js new file mode 100644 index 0000000000000000000000000000000000000000..84022e268838c571893d3bf0c2b6f5d092584dbf --- /dev/null +++ b/labs/node-bbb-apps/lib/connection-manager/MessageValidator.js @@ -0,0 +1,81 @@ +const Joi = require('joi'); + +let instance = null; + +module.exports = class MessageParser { + constructor() { + if(!instance){ + instance = this; + } + return instance; + } + + static const schema { + startScreenshare: Joi.object().keys({ + sdpOffer : Joi.string().required(), + vh: Joi.number().required(), + vw: Joi.number().required() + }), + + startVideo: Joi.object().keys({ + internalMeetingId: joi.string().required(), + callerName : Joi.string().required(), + }), + + startAudio: Joi.object().keys({ + internalMeetingId: joi.string().required(), + callerName : Joi.string().required(), + }), + + playStart: Joi.object().keys({ + }), + + playStop: Joi.object().keys.({ + }), + + stop: Joi.object().keys({ + }), + + onIceCandidate: Joi.object().keys({ + internalMeetingId: joi.string().required(), + candidate: Joi.object().required(), + }), + } + + static const messageTemplate Joi.object().keys({ + id: Joi.string().required(), + type: joi.string().required(), + role: joi.string().required(), + }) + + static const validateMessage (msg) { + let res = Joi.validate(msg, messageTemplate, {allowUnknown: true}); + + if (!res.error) { + res = Joi.validate(msg, schema[msg.id]); + } + + return res; + } + + _parse (message) { + let parsed = { id: '' }; + + try { + parsed = JSON.parse(message); + } catch (e) { + console.error(e); + } + + let res = validateMessage(parsed); + + if (res.error) { + parsed.validMessage = false; + parsed.errors = res.error; + } else { + parsed.validMessage = true; + } + + return parsed; + } +} diff --git a/labs/node-bbb-apps/lib/connection-manager/RedisConnectionManager.js b/labs/node-bbb-apps/lib/connection-manager/RedisConnectionManager.js new file mode 100644 index 0000000000000000000000000000000000000000..6c109baf75ac71b9c54a12f1ccca3f988a46ff90 --- /dev/null +++ b/labs/node-bbb-apps/lib/connection-manager/RedisConnectionManager.js @@ -0,0 +1,34 @@ +'use strict'; + +// incomplete + +module.exports = class RedisConnectionManager { + + constructor(options) { + + this._client = redis.createClient({options}); + this._pubchannel = options.pubchannel; + this._subchannel = optiosn.subchannel; + + if (options.pubchannel) { + this._client.on() + } + + if (options.subchannel) { + this._client.on() + } + + this._client.on() + // pub + + } + + setEventEmitter(emitter) { + this.emitter = emitter; + } + + _onMessage() { + + } + +} diff --git a/labs/node-bbb-apps/lib/connection-manager/WebsocketConnectionManager.js b/labs/node-bbb-apps/lib/connection-manager/WebsocketConnectionManager.js new file mode 100644 index 0000000000000000000000000000000000000000..ba95dd2e05fa197437e0536b0e7e1512badcaecf --- /dev/null +++ b/labs/node-bbb-apps/lib/connection-manager/WebsocketConnectionManager.js @@ -0,0 +1,92 @@ +'use strict'; + +const ws = require('ws'); +const C = require('../bbb/messages/Constants'); + +ws.prototype.setErrorCallback = function(callback) { + + this._errorCallback = callback; +}; + +ws.prototype.sendMessage = function(json) { + + let websocket = this; + + if (this._closeCode === 1000) { + console.log("Websocket closed, not sending"); + this._errorCallback("Error: not opened"); + } + + return this.send(JSON.stringify(json), function(error) { + if(error) { + console.log('server: Websocket error "' + error + '" on message "' + json.id + '"'); + + websocket._errorCallback(error); + } + }); + +}; + +module.exports = class WebsocketConnectionManager { + + constructor(server, path) { + this.wss = new ws.Server({ + server, + path + }); + + this.wss.on('connection', (ws) => { + ws.on('message', this._onMessage.bind(this)); + ws.setErrorCallback(this._onError.bind(this)); + + ws.on('close', this._onClose.bind(this)); + ws.on('error', this._onError.bind(this)); + + console.log(this.emitter); + // TODO: should we delete this listener after websocket dies? + this.emitter.on('response', this._onServerResponse.bind(ws)); + }); + + } + + setEventEmitter(emitter) { + console.log(emitter); + this.emitter = emitter; + } + + _onServerResponse(data) { + + console.log(' [WS] Receiving event '); + console.log(data); + + // Here this is the 'ws' instance + this.sendMessage(data); + } + + _onMessage(data) { + + let message = {}; + + try { + message = JSON.parse(data); + } catch(e) { + console.error(" [WS] JSON message parse error " + e); + message = {}; + } + + // Test for empty or invalid JSON + if (Object.getOwnPropertyNames(message).length !== 0) { + this.emitter.emit(C.WEBSOCKET_MESSAGE, message); + } + } + + _onError(err) { + console.log('Connection error'); + + } + + _onClose(err) { + console.log('closed Connection'); + } + +} diff --git a/labs/node-bbb-apps/lib/h264-sdp.js b/labs/node-bbb-apps/lib/h264-sdp.js new file mode 100644 index 0000000000000000000000000000000000000000..8da8f6d525858749e22f22428680c9985a908f2b --- /dev/null +++ b/labs/node-bbb-apps/lib/h264-sdp.js @@ -0,0 +1,56 @@ +/* + * A module with the sole purpose of removing all non h264 options from an sdpOffer + * + * We use this to prevent any transcoding from the Kurento side if Firefox or Chrome offer VP8/VP9 as + * the default format. + */ + +var sdpTransform = require('sdp-transform'); + +exports.transform = function(sdp) { + + var mediaIndex = 0; + var res = sdpTransform.parse(sdp); + var validPayloads; + + if (res.media[0].type === 'audio') { + // Audio + res.media[mediaIndex].rtp = res.media[mediaIndex].rtp.filter(function(elem) { + return elem.codec === 'opus'; + }); + + validPayloads = res.media[mediaIndex].rtp.map(function(elem) { + return elem.payload; + }); + + res.media[mediaIndex].fmtp = res.media[mediaIndex].fmtp.filter(function(elem) { + return validPayloads.indexOf(elem.payload) >= 0; + }); + + res.media[mediaIndex].payloads = validPayloads.join(' '); + + mediaIndex += 1; + } + + // Video + res.media[mediaIndex].rtp = res.media[mediaIndex].rtp.filter(function(elem) { + return elem.codec === 'H264'; + }); + + validPayloads = res.media[mediaIndex].rtp.map(function(elem) { + return elem.payload; + }); + + res.media[mediaIndex].fmtp = res.media[mediaIndex].fmtp.filter(function(elem) { + return validPayloads.indexOf(elem.payload) >= 0; + }); + + res.media[mediaIndex].rtcpFb = res.media[mediaIndex].rtcpFb.filter(function(elem) { + return validPayloads.indexOf(elem.payload) >= 0; + }); + + res.media[mediaIndex].payloads = validPayloads.join(' '); + + return sdpTransform.write(res); +}; + diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js b/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js new file mode 100644 index 0000000000000000000000000000000000000000..341fe4309ca6efbdc38526c6c9ebc3fb855226d1 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/constants/Constants.js @@ -0,0 +1,84 @@ +/* + * (C) Copyright 2016 Mconf Tecnologia (http://mconf.com/) + */ + +/** + * @classdesc + * Message constants for the communication with BigBlueButton + * @constructor + */ + +'use strict' + +exports.ALL = 'ALL' + +exports.LOG_LEVEL = {} +exports.LOG_LEVEL.DEBUG = 0 +exports.LOG_LEVEL.INFO = 1 +exports.LOG_LEVEL.WARN = 2 +exports.LOG_LEVEL.ERROR = 3 +exports.LOG_LEVEL.OFF = 100 + +exports.STATUS = {} +exports.STATUS.STARTED = "STARTED" +exports.STATUS.STOPPED = "STOPPED" +exports.STATUS.RUNNING = "RUNNING'" +exports.STATUS.STARTING = "STARTING" +exports.STATUS.STOPPING = "STOPPING" +exports.STATUS.RESTARTING = "RESTARTING" + +exports.USERS = {} +exports.USERS.SFU = "SFU" +exports.USERS.MCU = "MCU" + +exports.MEDIA_TYPE = {} +exports.MEDIA_TYPE.WEBRTC = "WebRtcEndpoint" +exports.MEDIA_TYPE.RTP= "RtpEndpoint" +exports.MEDIA_TYPE.URI = "PlayerEndpoint" + +// Observer Constants +exports.EVENT = {} +exports.EVENT.DIAL_EVENT = "BRIDGE_DIAL" +exports.EVENT.HANGUP_EVENT = "BRIDGE_HANGUP" +exports.EVENT.SESSION_ID_EVENT = "SESSION_ID" +exports.EVENT.AUDIO_SESSION_TERMINATED = "AUDIO_SESSION_TERMINATED" + +// Media server state changes +exports.EVENT.MEDIA_STATE = {}; +exports.EVENT.MEDIA_STATE.MEDIA_EVENT = "MediaEvent" +exports.EVENT.MEDIA_STATE.CHANGED = "MediaStateChanged" +exports.EVENT.MEDIA_STATE.FLOW_OUT = "MediaFlowOutStateChange" +exports.EVENT.MEDIA_STATE.FLOW_IN = "MediaFlowInStateChange" +exports.EVENT.MEDIA_STATE.ENDOFSTREAM = "EndOfStream" + + + +// RTP params +exports.SDP = {}; +exports.SDP.PARAMS = "params" +exports.SDP.MEDIA_DESCRIPTION = "media_description" +exports.SDP.LOCAL_IP_ADDRESS = "local_ip_address" +exports.SDP.LOCAL_VIDEO_PORT = "local_video_port" +exports.SDP.DESTINATION_IP_ADDRESS = "destination_ip_address" +exports.SDP.DESTINATION_VIDEO_PORT = "destination_video_port" +exports.SDP.REMOTE_VIDEO_PORT = "remote_video_port" +exports.SDP.CODEC_NAME = "codec_name" +exports.SDP.CODEC_ID = "codec_id" +exports.SDP.CODEC_RATE = "codec_rate" +exports.SDP.RTP_PROFILE = "rtp_profile" +exports.SDP.SEND_RECEIVE = "send_receive" +exports.SDP.FRAME_RATE = "frame_rate" + +// Strings +exports.STRING = {} +exports.STRING.ANONYMOUS = "ANONYMOUS" +exports.STRING.FS_USER_AGENT_STRING = "Freeswitch_User_Agent" +exports.STRING.XML_MEDIA_FAST_UPDATE = '<?xml version=\"1.0\" encoding=\"utf-8\" ?>' + + '<media_control>' + + '<vc_primitive>' + + '<to_encoder>' + + '<picture_fast_update>' + + '</picture_fast_update>' + + '</to_encoder>' + + '</vc_primitive>' + + '</media_control>' diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js new file mode 100644 index 0000000000000000000000000000000000000000..1410b60982d3c3953430dcdec4422484fd691def --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/MCSApiStub.js @@ -0,0 +1,99 @@ +'use strict' + +var config = require('config'); +var C = require('../constants/Constants'); +// EventEmitter +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var MediaController = require('./MediaController.js'); + + +module.exports = class MCSApiStub extends EventEmitter { + constructor() { + super(); + this._mediaController = new MediaController(); + } + + async join (room, type, params) { + let self = this; + try { + const answer = await this._mediaController.join(room, type, params); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + Promise.reject(err); + } + } + + // Not yet implemented in MediaController, should be simple nonetheless + async leave (room, userId) { + try { + const answer = await this._mediaController.leave(room, userId); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + + async publishnsubscribe (user, sourceId, sdp, params) { + try { + const answer = await this._mediaController.publishnsubscribe(user, sourceId, sdp, params); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + + async publish (user, room, type, params) { + try { + const answer = await this._mediaController.publish(user, room, type, params); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + + async unpublish (user, mediaId) { + try { + const answer = await this._mediaController.unpublish(user, mediaId); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + + async subscribe (user, sourceId, type, params) { + try { + const answer = await this._mediaController.subscribe(user, sourceId, type, params); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + return Promise.reject(err); + } + } + + async unsubscribe (user, sdp, params) { + try { + await this._mediaController.unsubscribe(user, mediaId); + return Promise.resolve(answer); + } + catch (err) { + console.log(err); + Promise.reject(err); + } + } + + setStrategy (strategy) { + // TODO + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js new file mode 100644 index 0000000000000000000000000000000000000000..4afcacf1450529ac38a322bd128b1a263da331a1 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/MediaController.js @@ -0,0 +1,231 @@ +'use strict' + +var config = require('config'); +var C = require('../constants/Constants'); + +// Model +var SfuUser = require('../model/SfuUser'); +var Room = require('../model/Room.js'); +var mediaServer = require('./media-server'); + + +/* PRIVATE ELEMENTS */ +/** + * Deep copy a javascript Object + * @param {Object} object The object to be copied + * @return {Object} A deep copy of the given object + */ +function copy(object) { + return JSON.parse(JSON.stringify(object)); +} + +function getPort(min_port, max_port) { + return Math.floor((Math.random()*(max_port - min_port +1)+ min_port)); +} + +function getVideoPort() { + return getPort(config.get('sip.min_video_port'), config.get('sip.max_video_port')); +} + +/* PUBLIC ELEMENTS */ + +module.exports = class MediaController { + constructor() { + // Kurento + this._rooms = {}; + this._users = {}; + } + + start (_kurentoClient, _kurentoToken, callback) { + var self = this; + return callback(null); + } + + stop (callback) { + var self = this; + self.stopAllMedias(function (e) { + if (e) { + callback(e); + } + self._rooms = {}; + }); + } + + getVideoPort () { + return getPort(config.get('sip.min_video_port'), config.get('sip.max_video_port')); + } + + getRoom (roomId) { + return this._rooms[roomdId]; + } + + async join (roomId, type, params) { + console.log("[mcs] join"); + try { + const room = await this.createRoomMCS(roomId); + const user = await this.createUserMCS(roomId, type, params); + let userId = user.id; + room.setUser(user); + if (params.sdp) { + const sessionId = user.addSdp(params.sdp); + } + if (params.uri) { + const sessionId = user.addUri(params.sdp); + } + console.log("[mcs] Resolving user " + userId); + return Promise.resolve(userId); + } + catch (err) { + console.log("[mcs] JOIN ERROR " + err); + return Promise.reject(new Error(err)); + } + } + + async publishnsubscribe (userId, sourceId, sdp, params) { + console.log("[mcs] pns"); + let type = params.type; + try { + const user = await this.getUserMCS(userId); + let userId = user.id; + const sessionId = user.addSdp(sdp, type); + const answer = await user.startSession(sessionId); + await user.connect(sourceId, sessionId); + console.log("[mcs] user with sdp session " + sessionId); + console.log(user); + return Promise.resolve({userId, sessionId}); + } + catch (err) { + console.log("[mcs] PUBLISHNSUBSCRIBE ERROR " + err); + return Promise.reject(new Error(err)); + } + } + + async publish (userId, roomId, type, params) { + console.log("[mcs] publish"); + let mediaId; + // TODO handle mediaType + let mediaType = params.mediaType; + let answer; + + try { + const user = await this.getUserMCS(userId); + + switch (type) { + case "SDP": + mediaId = user.addSdp(params.descriptor, type); + answer = await user.startSession(mediaId); + break; + case "URI": + mediaId = user.addUri(params.descriptor, type); + answer = await user.startSession(mediaId); + break; + + default: return Promise.reject(new Error("[mcs] Invalid media type")); + } + } + catch (err) { + return Promise.reject(new Error(err)); + } + console.log(user); + return Promise.resolve({answer, mediaId}); + } + + async subscribe (userId, type, sourceId, params) { + let mediaId; + // TODO handle mediaType + let mediaType = params.mediaType; + + try { + const user = this.getUserMCS(userId); + + switch (type) { + case "SDP": + mediaId = user.addSdp(params.descriptor, type); + await user.connect(sourceId, mediaId); + const answer = await user.startSession(mediaId); + break; + case "URI": + //TODO + //mediaId = user.addUri(params.descriptor); + break; + + default: return Promise.reject(new Error("[mcs] Invalid media type")); + } + } + catch (err) { + return Promise.reject(new Error(err)); + } + + return Promise.resolve({answer, mediaId}); + } + + + async unpublish (userId, mediaId) { + try { + const user = this.getUserMCS(userId); + const answer = await user.unpublish(mediaId); + return Promise.resolve(answer); + } + catch (err) { + return Promise.reject(new Error(err)); + } + } + + async unsubscribe (userId, mediaId) { + try { + const user = this.getUserMCS(userId); + const answer = await user.unsubscribe(mediaId); + return Promise.resolve(); + } + catch (err) { + return Promise.reject(new Error(err)); + } + } + + /** + * Creates an empty {Room} room and indexes it + * @param {String} roomId + */ + async createRoomMCS (roomId) { + var self = this; + + console.log(" [media] Creating new room with ID " + roomId); + + if(!self._rooms[roomId]) { + self._rooms[roomId] = new Room(roomId); + } + + return Promise.resolve(self._rooms[roomId]); + } + + /** + * Creates an {User} of type @type + * @param {String} roomId + */ + createUserMCS (roomId, type, params) { + let self = this; + let user; + console.log(" [media] Creating a new user[" + type + "]"); + + switch (type) { + case C.USERS.SFU: + user = new SfuUser(roomId, type, params.userAgentString, params.sdp); + break; + case C.USERS.MCU: + console.log(" [media] createUserMCS MCU TODO"); + break; + default: + console.log(" [controller] Unrecognized user type"); + } + + if(!self._users[user.id]) { + self._users[user.id] = user; + } + + return Promise.resolve(user); + } + + getUserMCS (userId) { + Promise.resolve(self._users[user.id]); + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js b/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js new file mode 100644 index 0000000000000000000000000000000000000000..118c860cd2f6b4d730d4d916a90f5ba7809f91ed --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/media/media-server.js @@ -0,0 +1,233 @@ +'use strict' + +const C = require('../constants/Constants.js'); +const config = require('config'); +const mediaServerClient = require('kurento-client'); +const util = require('util'); +const EventEmitter = require('events').EventEmitter; + +/* Public members */ +exports = class MediaServer extends EventEmitter { + constructor(serverUri) { + super(); + this._serverUri = serverUri; + this._mediaServer = _getMediaServerClient(this._serverUri); + this._mediaPipelines = {}; + this._mediaElements= {}; + } + + _getMediaServerClient (serverUri) { + return new Promise((resolve, reject) => { + mediaServerClient(serverUri, (error, client) => { + if (error) { + reject(error); + } + resolve(client); + }); + }); + } + + _getMediaPipeline (conference) { + return new Promise((resolve, reject) => { + if (_mediaPipelines[conference]) { + console.log(' [media] Pipeline already exists. ' + JSON.stringify(_mediaPipelines, null, 2)); + resolve(_mediaPipelines[conference]); + } + else { + this._mediaServer.create('MediaPipeline', function(err, pipeline) { + if (error) { + console.log(error); + reject(error); + } + _mediaPipelines[conference] = pipeline; + resolve(pipeline); + }); + } + }); + } + + _releasePipeline (pipelineId) { + let mediaPipeline = _mediaPipelines[pipelineId]; + + if (typeof mediaPipeline !== 'undefined' && typeof mediaPipeline.release === 'function') { + mediaElement.release(); + } + } + + _createElement (pipeline, type) { + return new Promise((resolve, reject) => { + pipeline.create(type, (error, mediaElement) => { + if (error) { + reject(error); + } + console.log(" [MediaController] Created [" + type + "] media element: " + mediaElement.id); + _mediaElements[mediaElement.id] = mediaElement; + resolve(mediaElement); + }); + }); + } + + + async createMediaElement (conference, type) { + try { + const pipeline = await this._getMediaPipeline(conference); + const mediaElement = await this._createElement(pipeline, type); + Promise.resolve(mediaElement.id); + } + catch (err) { + return Promise.reject(new Error(e)); + } + } + + async connectMediaElements (sourceId, sinkId, type) { + let source = _mediaElements[sourceId]; + let sink = _mediaElements[sinkId]; + + if (source && sink) { + return new Promise((resolve, reject) => { + switch (type) { + case 'ALL': + source.connect(sink, (error) => { + if (error) { + reject(error); + } + resolve(); + }); + break; + + + case 'AUDIO': + case 'VIDEO': + source.connect(sink, (error) => { + if (error) { + reject(error); + } + resolve(); + }); + break; + + default: reject("[mcs] Invalid connect type"); + } + }); + } + + return Promise.reject("Failed to connect " + type + ": " + sourceId + " to " + sinkId); + } + + stop (elementId) { + let mediaElement = _mediaElements[elementId]; + // TODO remove event listeners + if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { + mediaElement.release(); + } + } + + + addIceCandidate (elementId, candidate) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.addIceCandidate === 'function') { + mediaElement.addIceCandidate(candidate); + } + } + + gatherCandidates (elementId) { + let mediaElement = _mediaElements[elementId]; + + return new Promise((resolve, reject) => { + if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') { + mediaElement.gatherCandidates((error) => { + if (error) { + reject(error); + } + resolve(); + }); + } + reject(" [MediaController/gatherCandidates] There is no element " + elementId); + }); + } + + setInputBandwidth (elementId, min, max) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined') { + endpoint.setMinVideoRecvBandwidth(min); + endpoint.setMaxVideoRecvBandwidth(max); + } else { + return (" [MediaController/setInputBandwidth] There is no element " + elementId); + } + } + + setOutputBandwidth (endpoint, min, max) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined') { + endpoint.setMinVideoSendBandwidth(min); + endpoint.setMaxVideoSendBandwidth(max); + } else { + return (" [MediaController/setOutputBandwidth] There is no element " + elementId ); + } + } + + setOutputBitrate (endpoint, min, max) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined') { + endpoint.setMinOutputBitrate(min); + endpoint.setMaxOutputBitrate(max); + } else { + return (" [MediaController/setOutputBitrate] There is no element " + elementId); + } + } + + processOffer (elementId, sdpOffer) { + let mediaElement = _mediaElements[elementId]; + + return new Promise((resolve, reject) => { + if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') { + mediaElement.processOffer(sdpOffer, (error, answer) => { + if (error) { + reject(error); + } + resolve(answer); + }); + } + reject(" [MediaController/processOffer] There is no element " + elementId); + }); + } + + trackMediaState (elementId, type) { + switch (type) { + case C.MEDIA_TYPE.URI: + this.addMediaEventListener(elementId, C.MEDIA_TYPE.ENDOFSTREAM); + this.addMediaEventListener(elementId, C.MEDIA_TYPE.CHANGED); + this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_IN); + this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_OUT); + break; + + case C.MEDIA_TYPE.WEBRTC: + case C.MEDIA_TYPE.RTP: + this.addMediaEventListener(elementId, C.MEDIA_TYPE.CHANGED); + this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_IN); + this.addMediaEventListener(elementId, C.MEDIA_TYPE.FLOW_OUT); + break; + + default: return; + } + return; + } + + addMediaEventListener (elementId, eventTag) { + let mediaElement = _mediaElements[elementId]; + // TODO event type validator + if (typeof mediaElement !== 'undefined' && mediaElement) { + mediaElement.on(eventTag, (event) => { + this.emit(C.MEDIA_STATE.MEDIA_EVENT , {elementId, eventTag, event}); + }); + } + } + + notifyMediaState (elementId, eventTag, event) { + this.emit(C.MEDIA_STATE.MEDIA_EVENT , {elementId, eventTag, event}); + } +}; diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/Room.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/Room.js new file mode 100644 index 0000000000000000000000000000000000000000..8bcc8a7d8e61b170c0a74315504f3b9ef64490c8 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/Room.js @@ -0,0 +1,43 @@ +/** + * @classdesc + * Model class for rooms + */ + +'use strict' + +module.exports = class Room { + constructor(id) { + this._id = id; + this._users = {}; + this._mcuUsers = {}; + } + + getUser (id) { + return this._users[id]; + } + + getMcuUser (id) { + return this._mcuUsers[id]; + } + + setUser (user) { + if (typeof this._users[user.id] == 'undefined' || + !this._users[user.id]) { + this._users[user.id] = {}; + } + this._users[user.id] = user; + } + + destroyUser(user) { + let _user = this._users[user.id]; + _user.destroy(); + delete this._users[user.id]; + } + + destroyMcuUser (user) { + let _user = this._mcuUsers[user.id]; + _user.destroy(); + delete this._mcuUsers[user.id]; + } + +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js new file mode 100644 index 0000000000000000000000000000000000000000..752f971d47b650f3958a57a35a472516255b725f --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/SdpSession.js @@ -0,0 +1,78 @@ +/** + * @classdesc + * Model class for external devices + */ + +'use strict' + +const C = require('../constants/Constants'); +const SdpWrapper = require('../utils/SdpWrapper'); +const uuidv4 = require('uuid/v4'); +const EventEmitter = require('events').EventEmitter; +const MediaServer = require('../media/media-server'); + +module.exports = class SdpSession extends EventEmitter { + constructor(sdp = null, type = C.MEDIA_TYPE.RTP) { + super(); + this.id = uuidv4(); + this._status = C.STATUS.STOPPED; + this._type = type; + // {SdpWrapper} SdpWrapper + this._sdp; + if (sdp && type) { + this.setSdp(sdp, type); + } + } + + async setSdp (sdp, type) { + this._sdp = new SdpWrapper(sdp, type); + await this._sdp.processSdp(); + } + + async start (sdpId) { + this._status = C.STATUS.STARTING; + try { + console.log("[mcs] start/cme"); + const mediaElement = await MediaServer.createMediaElement(this.id, this._type); + console.log("[mcs] start/po " + mediaElement); + const answer = await MediaServer.processOffer(mediaElement, this._sdp.getMainDescription()); + Promise.resolve(answer); + } + catch (err) { + this.handleError(err); + Promise.reject(err); + } + } + + // TODO move to parent Session + async stop () { + this._status = C.STATUS.STOPPING; + try { + await MediaServer.stop(this.id); + this._status = C.STATUS.STOPPED; + Promise.resolve(); + } + catch (err) { + this.handleError(err); + Promise.reject(err); + } + } + + + // TODO move to parent Session + async connect (sinkId) { + try { + await MediaServer.connect(this.id, sinkId); + Promise.resolve(); + } + catch (err) { + this.handleError(err); + Promise.reject(err); + } + } + + handleError (err) { + console.log(err); + this._status = C.STATUS.STOPPED; + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js new file mode 100644 index 0000000000000000000000000000000000000000..728447b6efecd32f6aa3ac39f644a8569e05234c --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/SfuUser.js @@ -0,0 +1,155 @@ +/** + * @classdesc + * Model class for external devices + */ + +'use strict' + +const User = require('./User'); +const C = require('../constants/Constants'); +const SdpWrapper = require('../utils/SdpWrapper'); +const SdpSession = require('../model/SdpSession'); +const UriSession = require('../model/UriSession'); + +module.exports = class SfuUser extends User { + constructor(_roomId, type, userAgentString = C.STRING.ANONYMOUS, sdp = null, uri = null) { + super(_roomId); + // {SdpWrapper} SdpWrapper + this._sdp; + // {Object} hasAudio, hasVideo, hasContent + this._mediaSessions = {} + this.userAgentString; + if (sdp) { + this.addSdp(sdp); + } + if (uri) { + this.addUri(uri); + } + } + + async addUri (uri, type) { + // TODO switch from type to children UriSessions (RTSP|HTTP|etc) + let session = new UriSession(uri, type); + + if (typeof this._mediaSessions[session.id] == 'undefined' || + !this._mediaSessions[session.id]) { + this._mediaSessions[session.id] = {}; + } + this._mediaSessions[session.id] = session; + try { + await this.startSession(session.id); + Promise.resolve(session.id); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + addSdp (sdp, type) { + // TODO switch from type to children SdpSessions (WebRTC|SDP) + let session = new SdpSession(sdp, type); + + if (typeof this._mediaSessions[session.id] == 'undefined' || + !this._mediaSessions[session.id]) { + this._mediaSessions[session.id] = {}; + } + this._mediaSessions[session.id] = session; + console.log("[mcs] Added SDP " + session.id); + + return session.id; + } + + async startSession (sessionId) { + console.log("[mcs] starting session " + sessionId); + let session = this._mediaSessions[sessionId]; + + try { + const answer = await session.start(); + Promise.resolve(answer); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async subscribe (sdp, mediaId) { + let sessionId = await this.addSdp(sdp); + try { + await this.startSession(sessionId); + await this.connect(sessionId, mediaId); + Promise.resolve(); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async publish (sdp, mediaId) { + let sessionId = await this.addSdp(sdp); + try { + await this.startSession(sessionId); + Promise.resolve(); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async unsubscribe (sdp, mediaId) { + try { + await this.stopSession(sessionId); + Promise.resolve(); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async unpublish (sdp, mediaId) { + try { + await this.stopSession(sessionId); + Promise.resolve(); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async stopSession (sdpId) { + let session = this._mediaSessions[sdpId]; + + try { + await session.stop(); + Promise.resolve(); + } + catch (err) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + + async connect (sourceId, sinkId) { + let session = this._mediaSessions[sessionId]; + if(session) { + try { + await session.connect(sinkId); + Promise.resolve(); + } + catch (error) { + this.handleError(err); + Promise.reject(new Error(err)); + } + } + } + + handleError (err) { + console.log(err); + this._status = C.STATUS.STOPPED; + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/UriSession.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/UriSession.js new file mode 100644 index 0000000000000000000000000000000000000000..74b7795bcc8b8c75918b965355f7ba3716985bf9 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/UriSession.js @@ -0,0 +1,73 @@ +/** + * @classdesc + * Model class for external devices + */ + +'use strict' + +const C = require('../constants/Constants'); +const uuidv4 = require('uuid/v4'); +const EventEmitter = require('events').EventEmitter; +const MediaServer = require('../media/media-server'); + +module.exports = class UriSession extends EventEmitter { + constructor(uri = null) { + super(); + this.id = uuidv4(); + this._status = C.STATUS.STOPPED; + this._uri; + if (uri) { + this.setUri(uri); + } + } + + setUri (uri) { + this._uri = uri; + } + + async start () { + this._status = C.STATUS.STARTING; + try { + const mediaElement = await MediaServer.createMediaElement(this.id, C.MEDIA_TYPE.URI); + console.log("start/cme"); + await MediaServer.play(this.id); + this._status = C.STATUS.STARTED; + return Promise.resolve(); + } + catch (err) { + this.handleError(err); + return Promise.reject(new Error(err)); + } + } + + // TODO move to parent Session + async stop () { + this._status = C.STATUS.STOPPING; + try { + await MediaServer.stop(this.id); + this._status = C.STATUS.STOPPED; + return Promise.resolve(); + } + catch (err) { + this.handleError(err); + return Promise.reject(new Error(err)); + } + } + + // TODO move to parent Session + async connect (sinkId) { + try { + await MediaServer.connect(this.id, sinkId); + return Promise.resolve() + } + catch (err) { + this.handleError(err); + return Promise.reject(new Error(err)); + } + } + + handleError (err) { + console.log(err); + this._status = C.STATUS.STOPPED; + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/model/User.js b/labs/node-bbb-apps/lib/mcs-core/lib/model/User.js new file mode 100644 index 0000000000000000000000000000000000000000..b3d073a33788ec9c3978d23f81ecc737a5ba751d --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/model/User.js @@ -0,0 +1,18 @@ +/** + * @classdesc + * Model class for external devices + */ + +'use strict' + +const uuidv4 = require('uuid/v4'); +const User = require('./User'); +const C = require('../constants/Constants.js'); + +module.exports = class User { + constructor(roomId, type, userAgentString = C.STRING.ANONYMOUS) { + this.roomId = roomId; + this.id = uuidv4(); + this.userAgentString = userAgentString; + } +} diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js b/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js new file mode 100644 index 0000000000000000000000000000000000000000..64d8c9b76300a2c02f0231e62b36f59d21656156 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/utils/SdpWrapper.js @@ -0,0 +1,256 @@ +/** + * @classdesc + * Utils class for manipulating SDP + */ + +'use strict' + +var config = require('config'); +var transform = require('sdp-transform'); + +module.exports = class SdpWrapper { + constructor(sdp) { + this._plainSdp = sdp; + this._jsonSdp = transform.parse(sdp); + this._mediaLines = {}; + this._mediaCapabilities = {}; + this._profileThreshold = "ffffff"; + } + + setSdp (sdp) { + this._plainSdp = sdp; + this._jsonSdp = transform.parse(sdp); + } + + getPlainSdp () { + return this._plainSdp; + } + + getJsonSdp () { + return this._jsonSdp; + } + + removeFmtp () { + return this._plainSdp.replace(/(a=fmtp:).*/g, ''); + } + + replaceServerIpv4 (ipv4) { + return this._plainSdp.replace(/(IP4\s[0-9.]*)/g, 'IP4 ' + ipv4); + } + + getCallId () { + return this._plainSdp.match(/(call-id|i):\s(.*)/i)[2]; + } + + /** + * Given a SDP, test if there is more than on video description + * @param {string} sdp The Session Descriptor + * @return {boolean} true if there is more than one video description, else false + */ + hasAudio () { + return /(m=audio)/i.test(this._plainSdp); + } + + /** + * Given a SDP, test if there is a video description in it + * @param {string} sdp The Session Descriptor + * @return {boolean} true if there is a video description, else false + */ + hasVideo (sdp) { + return /(m=video)/i.test(sdp); + } + + /** + * Given a SDP, test if there is more than on video description + * @param {string} sdp The Session Descriptor + * @return {boolean} true if there is more than one video description, else false + */ + hasMultipleVideo (sdp) { + return /(m=video)([\s\S]*\1){1,}/i.test(sdp); + } + + /** + * Given a SDP, return its Session Description + * @param {string} sdp The Session Descriptor + * @return {string} Session description (SDP until the first media line) + */ + getSessionDescription (sdp) { + return sdp.match(/[\s\S]+?(?=m=audio|m=video)/i); + } + + removeSessionDescription (sdp) { + return sdp.match(/(?=[\s\S]+?)(m=audio[\s\S]+|m=video[\s\S]+)/i)[1]; + } + + getVideoParameters (sdp) { + var res = transform.parse(sdp); + console.log(" [sdp] getVideoParameters => " + JSON.stringify(res, null, 2)); + var params = {}; + params.fmtp = ""; + params.codecId = 96; + var pt = 0; + for(var ml of res.media) { + if(ml.type == 'video') { + if (typeof ml.fmtp[0] != 'undefined' && ml.fmtp) { + params.codecId = ml.fmtp[0].payload; + params.fmtp = ml.fmtp[0].config; + console.log(" [sdp] getVideoParameters fmtp => " + JSON.stringify(params)); + return params; + } + } + } + return params; + } + + /** + * Given a SDP, return its Content Description + * @param {string} sdp The Session Descriptor + * @return {string} Content Description (SDP after first media description) + */ + getContentDescription (sdp) { + var res = transform.parse(sdp); + res.media = res.media.filter(function (ml) { return ml.type == "video" }); + var mangledSdp = transform.write(res); + if(typeof mangledSdp != undefined && mangledSdp && mangledSdp != "") { + return mangledSdp; + } + else + return sdp; + } + + /** + * Given a SDP, return its first Media Description + * @param {string} sdp The Session Descriptor + * @return {string} Content Description (SDP after first media description) + */ + getAudioDescription (sdp) { + var res = transform.parse(sdp); + res.media = res.media.filter(function (ml) { return ml.type == "audio" }); + // Hack: Some devices (Snom, Pexip) send crypto with RTP/AVP + // That is forbidden according to RFC3711 and FreeSWITCH rebukes it + res = this.removeTransformCrypto(res); + var mangledSdp = transform.write(res); + this.getSessionDescription(mangledSdp); + if(typeof mangledSdp != undefined && mangledSdp && mangledSdp != "") { + return mangledSdp; + } + else { + return sdp; + } + } + + /** + * Given a SDP, return its first Media Description + * @param {string} sdp The Session Descriptor + * @return {string} Content Description (SDP after first media description) + */ + getMainDescription () { + var res = transform.parse(this._plainSdp); + // Filter should also carry && ml.invalid[0].value != 'content:slides'; + // when content is enabled + res.media = res.media.filter(function (ml) { return ml.type == "video"}); //&& ml.invalid[0].value != 'content:slides'}); + var mangledSdp = transform.write(res); + if (typeof mangledSdp != undefined && mangledSdp && mangledSdp != "") { + console.log(" [sdp] MAIN VIDEO SDP => " + mangledSdp); + return mangledSdp; + } + else { + return sdp; + } + } + + /** + * Given a JSON SDP, remove associated crypto 'a=' lines from media lines + * WARNING: HACK MADE FOR FreeSWITCH ~1.4 COMPATIBILITY + * @param {Object} sdp The Session Descriptor JSON + * @return {Object} JSON SDP without crypto lines + */ + removeTransformCrypto (sdp) { + for(var ml of sdp.media) { + delete ml['crypto']; + } + return sdp; + } + + removeHighQualityFmtps (sdp) { + let res = transform.parse(sdp); + let maxProfileLevel = config.get('kurento.maximum_profile_level_hex'); + let pt = 0; + let idx = 0; + for(var ml of res.media) { + if(ml.type == 'video') { + for(var fmtp of ml.fmtp) { + let fmtpConfig = transform.parseParams(fmtp.config); + let profileId = fmtpConfig['profile-level-id']; + if(typeof profileId !== 'undefined' && parseInt(profileId, 16) > parseInt(maxProfileLevel, 16)) { + console.log(" [sdp] Filtering profile " + parseInt(profileId, 16) + ". Higher than max "+ parseInt(maxProfileLevel, 16)); + pt = fmtp.payload; + delete ml.fmtp[idx]; + ml.rtp = ml.rtp.filter((rtp) => { return rtp.payload != pt}); + } + else { + // Remove fmtp further specifications + //let configProfile = "profile-level-id="+profileId; + //fmtp.config = configProfile; + } + idx++; + } + } + } + var mangledSdp = transform.write(res); + return mangledSdp; + } + + async processSdp () { + let description = this._plainSdp; + if(config.get('kurento.force_low_resolution')) { + description = this.removeFmtp(description); + } + + description = description.toString().replace(/telephone-event/, "TELEPHONE-EVENT"); + + this._mediaCapabilities.hasVideo = this.hasVideo(description); + this._mediaCapabilities.hasAudio = this.hasAudio(description); + this._mediaCapabilities.hasContent = this.hasMultipleVideo(description); + this.sdpSessionDescription = this.getSessionDescription(description); + this.audioSdp = this.getAudioDescription(description); + this.mainVideoSdp = this.getMainDescription(description); + this.mainVideoSdp = this.removeHighQualityFmtps(this.mainVideoSdp); + this.contentVideoSdp = this.getContentDescription(description); + + return; + } + + /* DEVELOPMENT METHODS */ + _disableMedia (sdp) { + return sdp.replace(/(m=application\s)\d*/g, "$10"); + }; + + /** + * Given a SDP, add Floor Control response + * @param {string} sdp The Session Descriptor + * @return {string} A new Session Descriptor with Floor Control + */ + _addFloorControl (sdp) { + return sdp.replace(/a=inactive/i, 'a=sendrecv\r\na=floorctrl:c-only\r\na=setup:active\r\na=connection:new'); + } + + /** + * Given a SDP, add Floor Control response to reinvite + * @param {string} sdp The Session Descriptor + * @return {string} A new Session Descriptor with Floor Control Id + */ + _addFloorId (sdp) { + sdp = sdp.replace(/(a=floorctrl:c-only)/i, '$1\r\na=floorid:1 m-stream:3'); + return sdp.replace(/(m=video.*)([\s\S]*?m=video.*)([\s\S]*)/i, '$1\r\na=content:main\r\na=label:1$2\r\na=content:slides\r\na=label:3$3'); + } + + /** + * Given the string representation of a Session Descriptor, remove it's video + * @param {string} sdp The Session Descriptor + * @return {string} A new Session Descriptor without the video + */ + _removeVideoSdp (sdp) { + return sdp.replace(/(m=video[\s\S]+)/g,''); + }; +}; diff --git a/labs/node-bbb-apps/lib/mcs-core/lib/utils/sdp-utils.js b/labs/node-bbb-apps/lib/mcs-core/lib/utils/sdp-utils.js new file mode 100644 index 0000000000000000000000000000000000000000..11b06b0bcf49d721be79203de85fe307962b74a7 --- /dev/null +++ b/labs/node-bbb-apps/lib/mcs-core/lib/utils/sdp-utils.js @@ -0,0 +1,37 @@ +/** + * @classdesc + * Utils class for SDP generation + */ + +module.exports.generateSdp = function(remote_ip_address, remote_video_port) { + return "v=0\r\n" + + "o=- 0 0 IN IP4 " + remote_ip_address + "\r\n" + + "s=No Name\r\n" + + "c=IN IP4 " + remote_ip_address + "\r\n" + + "t=0 0\r\n" + + "m=video " + remote_video_port + " RTP/AVP 96\r\n" + + "a=rtpmap:96 H264/90000\r\n" + + "a=ftmp:96 packetization-mode=0\r\n"; +} + +/** + * Generates a video SDP given the media specs + * @param {string} sourceIpAddress The source IP address of the media + * @param {string} sourceVideoPort The source video port of the media + * @param {string} codecId The ID of the codec + * @param {string} sendReceive The SDP flag of the media flow + * direction, 'sendonly', 'recvonly' or 'sendrecv' + * @param {String} rtpProfile The RTP profile of the RTP Endpoint + * @param {String} codecName The name of the codec used for the RTP + * Endpoint + * @param {String} codecRate The codec rate + * @return {string} The Session Descriptor for the media + */ +module.exports.generateVideoSdp = function (sourceIpAddress, sourceVideoPort, codecId, sendReceive, rtpProfile, codecName, codecRate, fmtp) { + return 'm=video ' + sourceVideoPort + ' ' + rtpProfile + ' ' + codecId + '\r\n' + + 'a=' + sendReceive + '\r\n' + + 'c=IN IP4 ' + sourceIpAddress + '\r\n' + + 'a=rtpmap:' + codecId + ' ' + codecName + '/' + codecRate + '\r\n' + + 'a=fmtp:' + codecId + ' ' + fmtp + '\r\n'; +}; + diff --git a/labs/node-bbb-apps/lib/media-controller.js b/labs/node-bbb-apps/lib/media-controller.js new file mode 100644 index 0000000000000000000000000000000000000000..df9697ec6984deb5f425fbb7c210fba4b6ce62e6 --- /dev/null +++ b/labs/node-bbb-apps/lib/media-controller.js @@ -0,0 +1,141 @@ +'use strict' + +const Constants = require('./bbb/messages/Constants.js'); +const config = require('config'); +const kurento = require('kurento-client'); +const mediaServerClient = null; + +var _mediaPipelines = {}; +var _mediaElements= {}; + +function createMediaPipeline(id, callback) { + console.log(' [media] Creating media pipeline for ' + id); + getMediaServerClient(function (error, mediaServerClient) { + mediaServerClient.create('MediaPipeline', function(err, pipeline) { + if (error) { + console.log("Could not find media server at address " + kurentoUrl); + return callback(error); + } + return callback(null , pipeline); + }); + }); +}; + +function getMediaServerClient (callback) { + let kurentoUrl = config.get('kurentoUrl'); + if (mediaServerClient) { + callback(null, mediaServerClient); + } + else { + kurento(kurentoUrl, function(error, _mediaServerClient) { + if (error) { + console.log("Could not find media server at address " + kurentoUrl); + return callback(error, null); + } + + console.log(" [server] Initiating kurento client. Connecting to: " + kurentoUrl); + return callback(null, _mediaServerClient); + }); + } +}; + +/* Public members */ +module.exports = { + + createMediaElement : function (conference, type, callback) { + let self = this; + self.getMediaPipeline(conference, function(error, pipeline) { + + pipeline.create(type, function(error, mediaElement) { + if (error) { + return callback(error, null); + } + console.log(" [MediaController] Created [" + type + "] media element: " + mediaElement.id); + _mediaElements[mediaElement.id] = mediaElement; + return callback(null, mediaElement); + }); + }); + }, + + connectMediaElements : function (sourceId, sinkId, type, callback) { + let source = _mediaElements[sourceId]; + let sink = _mediaElements[sinkId]; + + if (source && sink) { + if (type === 'ALL') { + source.connect(sink, function (error) { + return callback (error); + }); + } else { + console.log(typeof source.connect); + source.connect(sink, type, function (error) { + return callback (error); + }); + } + } else { + return callback ("Failed to connect " + type + ": " + sourceId + " to " + sinkId); + } + }, + + releaseMediaElement : function (elementId) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { + mediaElement.release(); + } + }, + + releasePipeline: function (pipelineId) { + let MediaPipeline = _mediaPipelines[pipelineId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.release === 'function') { + mediaElement.release(); + } + }, + + processOffer : function (elementId, sdpOffer, callback) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.processOffer === 'function') { + mediaElement.processOffer (sdpOffer, function (error, sdpAnswer) { + return callback (error, sdpAnswer); + }); + } else { + return callback (" [MediaController/processOffer] There is no element " + elementId, null); + } + }, + + getMediaPipeline : function(conference, callback) { + let self = this; + + if (_mediaPipelines[conference]) { + console.log(' [media] Pipeline already exists. ' + JSON.stringify(_mediaPipelines, null, 2)); + return callback(null, _mediaPipelines[conference]); + } else { + createMediaPipeline(conference, function(error, pipeline) { + _mediaPipelines[conference] = pipeline; + return callback(error, pipeline); + }); + } + }, + + addIceCandidate : function (elementId, candidate) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.addIceCandidate === 'function') { + mediaElement.addIceCandidate(candidate); + } + }, + + gatherCandidates : function (elementId, callback) { + let mediaElement = _mediaElements[elementId]; + + if (typeof mediaElement !== 'undefined' && typeof mediaElement.gatherCandidates === 'function') { + mediaElement.gatherCandidates(function (error) { + return callback(error); + }); + } else { + return callback (" [MediaController/gatherCandidates] There is no element " + elementId, null); + } + }, +}; diff --git a/labs/node-bbb-apps/lib/media-handler.js b/labs/node-bbb-apps/lib/media-handler.js new file mode 100644 index 0000000000000000000000000000000000000000..2d1ab3815cb3d583be7a115c12ab6a1c6626fa92 --- /dev/null +++ b/labs/node-bbb-apps/lib/media-handler.js @@ -0,0 +1,99 @@ +var config = require('config'); +var kurento = require('kurento-client'); +var Constants = require('./bbb/messages/Constants'); + +var kurentoClient = null; +var mediaPipelines = {}; + +module.exports.getKurentoClient = function(kurentoUrl, callback) { + if (kurentoClient !== null) { + return callback(null, kurentoClient); + } + + kurento(kurentoUrl, function(error, _kurentoClient) { + if (error) { + console.log("Could not find media server at address " + kurentoUrl); + return callback("Could not find media server at address" + kurentoUrl + ". Exiting with error " + error); + } + + console.log(" [MediaHandler] Initiating kurento client. Connecting to: " + kurentoUrl); + + kurentoClient = _kurentoClient; + callback(null, kurentoClient); + }); +} + +module.exports.getMediaPipeline = function(id, callback) { + console.log(' [MediaHandler] Creating media pipeline for ' + id); + + if (mediaPipelines[id]) { + console.log(' [media] Pipeline already exists.'); + callback(null, mediaPipelines[id]); + } else { + kurentoClient.create('MediaPipeline', function(err, pipeline) { + mediaPipelines[id] = pipeline; + return callback(err, pipeline); + }); + } +} + +module.exports.generateSdp = function(remote_ip_address, remote_video_port) { + return "v=0\r\n" + + "o=- 0 0 IN IP4 " + remote_ip_address + "\r\n" + + "s=Kurento-SCREENSHARE\r\n" + + "c=IN IP4 " + remote_ip_address + "\r\n" + + "t=0 0\r\n" + + "m=video " + remote_video_port + " RTP/AVPF 96\r\n" + + "a=rtpmap:96 H264/90000\r\n" + + "a=ftmp:96\r\n"; +} + +module.exports.generateVideoSdp = function (sourceIpAddress, sourceVideoPort) { + return "v=0\r\n" + + "o=- 0 0 IN IP4 " + sourceIpAddress + "\r\n" + + "s=Kurento-SCREENSHARE\r\n" + + 'm=video ' + sourceVideoPort + ' ' + this.videoConfiguration.rtpProfile + ' ' + this.videoConfiguration.codecId + '\r\n' + + 'a=' + this.videoConfiguration.sendReceive + '\r\n' + + 'c=IN IP4 ' + sourceIpAddress + '\r\n' + + 'a=rtpmap:' + this.videoConfiguration.codecId + ' ' + this.videoConfiguration.codecName + '/' + this.videoConfiguration.codecRate + '\r\n' + + 'a=fmtp:' + this.videoConfiguration.codecId + '\r\n' + + 'a=rtcp-fb:' + this.videoConfiguration.codecId + ' ccm fir \r\n' + + 'a=rtcp-fb:' + this.videoConfiguration.codecId + ' nack \r\n' + + 'a=rtcp-fb:' + this.videoConfiguration.codecId + ' nack pli \r\n' + + 'a=rtcp-fb:' + this.videoConfiguration.codecId + ' goog-remb \r\n'; +}; + +module.exports.videoConfiguration = { + codecId: '96', + sendReceive: 'sendrecv', + rtpProfile: 'RTP/AVPF', + codecName: 'H264', + frameRate: '30.000000', + codecRate: '90000' +}; + +module.exports.generateStreamUrl = function (address, meeting, path) { + return "rtmp://" + address + "/video-broadcast/" + meeting + "/" + path; +} + +module.exports.generateTranscoderParams = function (localIp, destIp, sendPort, recvPort, input, streamType, transcoderType, codec, callername) { + var rtpParams = {}; + rtpParams[Constants.LOCAL_IP_ADDRESS] = localIp; + rtpParams[Constants.LOCAL_VIDEO_PORT] = sendPort; + rtpParams[Constants.DESTINATION_IP_ADDRESS] = destIp; + rtpParams[Constants.REMOTE_VIDEO_PORT] = recvPort; + rtpParams[Constants.INPUT] = input; + rtpParams[Constants.STREAM_TYPE] = streamType; + rtpParams[Constants.TRANSCODER_TYPE] = transcoderType; + rtpParams[Constants.TRANSCODER_CODEC] = codec; + rtpParams[Constants.CALLERNAME] = callername; + return rtpParams; +} + +module.exports.getPort = function (min_port, max_port) { + return Math.floor((Math.random() * (max_port - min_port + 1) + min_port)); +} + +module.exports.getVideoPort = function () { + return this.getPort(config.get('minVideoPort'), config.get('maxVideoPort')); +} diff --git a/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js new file mode 100644 index 0000000000000000000000000000000000000000..56cd5705c0beb5f1c80e38f619b2d23472755383 --- /dev/null +++ b/labs/node-bbb-apps/lib/screenshare/ScreenshareManager.js @@ -0,0 +1,178 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +"use strict"; + +const BigBlueButtonGW = require('../bbb/pubsub/bbb-gw'); +var Screenshare = require('./screenshare'); +var C = require('../bbb/messages/Constants'); +// Global variables + +module.exports = class ScreenshareManager { + constructor (logger) { + this._logger = logger; + this._clientId = 0; + + this._sessions = {}; + this._screenshareSessions = {}; + + this._bbbGW = new BigBlueButtonGW(); + this._redisGateway; + } + + async start() { + let self = this; + + try { + this._redisGateway = await this._bbbGW.addSubscribeChannel(C.TO_SCREENSHARE); + this._redisGateway.on(C.REDIS_MESSAGE, this._onMessage.bind(this)); + //this._bbbGW.on(C.GATEWAY_MESSAGE, function() {console.log("AQWOEIHQWUOEHQWUEHQWIUEH")}); + process.on('message', this._onMessage.bind(this)); + console.log(' [ScreenshareManager] Successfully subscribed to redis channel'); + } + catch (error) { + console.log(' [ScreenshareManager] Could not connect to transcoder redis channel, finishing app...'); + console.log(error); + self._stopAll(); + } + } + + _onMessage(_message) { + console.log(' [ScreenshareManager] Received message => '); + let self = this; + let session; + let message = _message; + + console.log(message); + // The sessionId is voiceBridge for screensharing sessions + let sessionId = message.voiceBridge; + if(this._screenshareSessions[sessionId]) { + session = this._screenshareSessions[sessionId]; + } + + switch (message.id) { + + case 'presenter': + + // Checking if there's already a Screenshare session started + // because we shouldn't overwrite it + + if (!self._screenshareSessions[message.voiceBridge]) { + self._screenshareSessions[message.voiceBridge] = {} + self._screenshareSessions[message.voiceBridge] = session; + } + + //session.on('message', self._assembleSessionMessage.bind(self)); + if(session) { + break; + } + + session = new Screenshare(sessionId, self._bbbGW, + sessionId, message.callerName, message.vh, message.vw, + message.internalMeetingId); + + self._screenshareSessions[sessionId] = {} + self._screenshareSessions[sessionId] = session; + + // starts presenter by sending sessionID, websocket and sdpoffer + session._startPresenter(sessionId, message.sdpOffer, function(error, sdpAnswer) { + console.log(" [ScreenshareManager] Started presenter " + sessionId); + if (error) { + self._bbbGW.publish(JSON.stringify({ + id : 'presenterResponse', + response : 'rejected', + message : error + }), C.FROM_SCREENSHARE); + return error; + } + + self._bbbGW.publish(JSON.stringify({ + id : 'presenterResponse', + response : 'accepted', + sdpAnswer : sdpAnswer + }), C.FROM_SCREENSHARE); + + console.log(" [ScreenshareManager] [websocket] Sending presenterResponse \n" + sdpAnswer); + }); + break; + + case 'viewer': + console.log(" [ScreenshareManager][viewer] Session output \n " + session); + if (message.sdpOffer && message.voiceBridge) { + if (session) { + session._startViewer(message.voiceBridge, message.sdpOffer, message.callerName, self._screenshareSessions[message.voiceBridge]._presenterEndpoint); + } else { + // TODO ERROR HANDLING + } + } + break; + + case 'stop': + console.log('[' + message.id + '] connection ' + sessionId); + + if (session) { + session._stop(sessionId); + } else { + console.log(" [stop] Why is there no session on STOP?"); + } + break; + + case 'onIceCandidate': + if (session) { + session._onIceCandidate(message.candidate); + } else { + console.log(" [iceCandidate] Why is there no session on ICE CANDIDATE?"); + } + break; + + case 'ping': + self._bbbGW.publish(JSON.stringify({ + id : 'pong', + response : 'accepted' + }), C.FROM_SCREENSHARE); + break; + + + case 'viewerIceCandidate': + console.log("[viewerIceCandidate] Session output => " + session); + if (session) { + session._onViewerIceCandidate(message.candidate, message.callerName); + } else { + console.log("[iceCandidate] Why is there no session on ICE CANDIDATE?"); + } + break; + + default: + self._bbbGW.publish(JSON.stringify({ + id : 'error', + message: 'Invald message ' + message + }), C.FROM_SCREENSHARE); + break; + } + } + + _stopSession(sessionId) { + console.log(' [>] Stopping session ' + sessionId); + let session = this._screenshareSessions[sessionId]; + if(typeof session !== 'undefined' && typeof session._stop === 'function') { + session._stop(); + } + + delete this._screenshareSessions[sessionId]; + } + + _stopAll() { + console.log('\n [x] Stopping everything! '); + let sessionIds = Object.keys(this._screenshareSessions); + + for (let i = 0; i < sessionIds.length; i++) { + this._stopSession(sessionIds[i]); + } + + setTimeout(process.exit, 1000); + } +}; diff --git a/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js b/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js new file mode 100644 index 0000000000000000000000000000000000000000..b2fb752838257ab48e17ce3f7794d144c51a718a --- /dev/null +++ b/labs/node-bbb-apps/lib/screenshare/ScreenshareProcess.js @@ -0,0 +1,13 @@ +const ScreenshareManager = require('./ScreenshareManager'); + +process.on('uncaughtException', function (error) { + console.log(error.stack); +}); + +process.on('disconnect',function() { + console.log("Parent exited!"); + process.kill(); +}); + +c = new ScreenshareManager(); +c.start(); diff --git a/labs/node-bbb-apps/lib/screenshare/screenshare.js b/labs/node-bbb-apps/lib/screenshare/screenshare.js new file mode 100644 index 0000000000000000000000000000000000000000..66ec882b31c8bb345efee2d76f1f407f60fbad5c --- /dev/null +++ b/labs/node-bbb-apps/lib/screenshare/screenshare.js @@ -0,0 +1,341 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +'use strict' + +// Imports +const C = require('../bbb/messages/Constants'); +const MediaHandler = require('../media-handler'); +const Messaging = require('../bbb/messages/Messaging'); +const moment = require('moment'); +const h264_sdp = require('../h264-sdp'); +const now = moment(); +const MediaController = require('../media-controller'); + +// Global stuff +var sharedScreens = {}; +var rtpEndpoints = {}; + +const kurento = require('kurento-client'); +const config = require('config'); +const kurentoUrl = config.get('kurentoUrl'); +const kurentoIp = config.get('kurentoIp'); +const localIpAddress = config.get('localIpAddress'); + +if (config.get('acceptSelfSignedCertificate')) { + process.env.NODE_TLS_REJECT_UNAUTHORIZED=0; +} + +module.exports = class Screenshare { + constructor(id, bbbgw, voiceBridge, caller, vh, vw, meetingId) { + this._id = id; + this._BigBlueButtonGW = bbbgw; + this._presenterEndpoint = null; + this._ffmpegRtpEndpoint = null; + this._voiceBridge = voiceBridge; + this._meetingId = meetingId; + this._caller = caller; + this._streamUrl = ""; + this._vw = vw; + this._vh = vh; + this._candidatesQueue = []; + this._viewersEndpoint = []; + this._viewersCandidatesQueue = []; + } + + // TODO isolate ICE + _onIceCandidate(_candidate) { + let candidate = kurento.getComplexType('IceCandidate')(_candidate); + + if (this._presenterEndpoint) { + this._presenterEndpoint.addIceCandidate(candidate); + } + else { + this._candidatesQueue.push(candidate); + } + }; + + _onViewerIceCandidate(_candidate, callerName) { + console.log("onviewericecandidate callerName = " + callerName); + let candidate = kurento.getComplexType('IceCandidate')(_candidate); + + if (this._viewersEndpoint[callerName]) { + this._viewersEndpoint[callerName].addIceCandidate(candidate); + } + else { + if (!this._viewersCandidatesQueue[callerName]) { + this._viewersCandidatesQueue[callerName] = []; + } + this._viewersCandidatesQueue[callerName].push(candidate); + } + } + + _startViewer(voiceBridge, sdp, callerName, presenterEndpoint, callback) { + let self = this; + let _callback = function(){}; + console.log("startviewer callerName = " + callerName); + self._viewersCandidatesQueue[callerName] = []; + + console.log("VIEWER VOICEBRIDGE: "+self._voiceBridge); + + MediaController.createMediaElement(voiceBridge, C.WebRTC, function(error, webRtcEndpoint) { + if (error) { + console.log("Media elements error" + error); + return _callback(error); + } + + self._viewersEndpoint[callerName] = webRtcEndpoint; + + // QUEUES UP ICE CANDIDATES IF NEGOTIATION IS NOT YET READY + while(self._viewersCandidatesQueue[callerName].length) { + let candidate = self._viewersCandidatesQueue[callerName].shift(); + MediaController.addIceCandidate(self._viewersEndpoint[callerName].id, candidate); + } + // CONNECTS TWO MEDIA ELEMENTS + MediaController.connectMediaElements(presenterEndpoint.id, self._viewersEndpoint[callerName].id, C.VIDEO, function(error) { + if (error) { + console.log("Media elements CONNECT error " + error); + //pipeline.release(); + return _callback(error); + } + }); + + // ICE NEGOTIATION WITH THE ENDPOINT + self._viewersEndpoint[callerName].on('OnIceCandidate', function(event) { + let candidate = kurento.getComplexType('IceCandidate')(event.candidate); + self._BigBlueButtonGW.publish(JSON.stringify({ + id : 'iceCandidate', + candidate : candidate + }), C.FROM_SCREENSHARE); + }); + + sdp = h264_sdp.transform(sdp); + // PROCESS A SDP OFFER + MediaController.processOffer(webRtcEndpoint.id, sdp, function(error, webRtcSdpAnswer) { + if (error) { + console.log(" [webrtc] processOffer error => " + error + " for SDP " + sdp); + //pipeline.release(); + return _callback(error); + } + self._BigBlueButtonGW.publish(JSON.stringify({ + id: "viewerResponse", + sdpAnswer: webRtcSdpAnswer, + response: "accepted" + }), C.FROM_SCREENSHARE); + console.log(" Sent sdp message to client with callerName:" + callerName); + + MediaController.gatherCandidates(webRtcEndpoint.id, function(error) { + if (error) { + return _callback(error); + } + + self._viewersEndpoint[callerName].on('MediaFlowInStateChange', function(event) { + if (event.state === 'NOT_FLOWING') { + console.log(" NOT FLOWING "); + } + else if (event.state === 'FLOWING') { + console.log(" FLOWING "); + } + }); + }); + }); + }); + } + + + _startPresenter(id, sdpOffer, callback) { + let self = this; + let _callback = callback; + + // Force H264 on Firefox and Chrome + sdpOffer = h264_sdp.transform(sdpOffer); + console.log("Starting presenter for " + sdpOffer); + console.log("PRESENTER VOICEBRIDGE: " + self._voiceBridge); + MediaController.createMediaElement(self._voiceBridge, C.WebRTC, function(error, webRtcEndpoint) { + if (error) { + console.log("Media elements error" + error); + return _callback(error); + } + MediaController.createMediaElement(self._voiceBridge, C.RTP, function(error, rtpEndpoint) { + if (error) { + console.log("Media elements error" + error); + return _callback(error); + } + + + while(self._candidatesQueue.length) { + let candidate = self._candidatesQueue.shift(); + MediaController.addIceCandidate(webRtcEndpoint.id, candidate); + } + + MediaController.connectMediaElements(webRtcEndpoint.id, rtpEndpoint.id, C.VIDEO, function(error) { + if (error) { + console.log("Media elements CONNECT error " + error); + //pipeline.release(); + return _callback(error); + } + + // It's a user sharing a Screen + sharedScreens[id] = webRtcEndpoint; + rtpEndpoints[id] = rtpEndpoint; + + // Store our endpoint + self._presenterEndpoint = webRtcEndpoint; + self._ffmpegRtpEndpoint = rtpEndpoint; + + self._presenterEndpoint.on('OnIceCandidate', function(event) { + let candidate = kurento.getComplexType('IceCandidate')(event.candidate); + self._BigBlueButtonGW.publish(JSON.stringify({ + id : 'iceCandidate', + cameraId: id, + candidate : candidate + }), C.FROM_SCREENSHARE); + }); + + MediaController.processOffer(webRtcEndpoint.id, sdpOffer, function(error, webRtcSdpAnswer) { + if (error) { + console.log(" [webrtc] processOffer error => " + error + " for SDP " + sdpOffer); + //pipeline.release(); + return _callback(error); + } + + let sendVideoPort = MediaHandler.getVideoPort(); + + let rtpSdpOffer = MediaHandler.generateVideoSdp(localIpAddress, sendVideoPort); + console.log(" [rtpendpoint] RtpEndpoint processing => " + rtpSdpOffer); + + MediaController.gatherCandidates(webRtcEndpoint.id, function(error) { + if (error) { + return _callback(error); + } + + MediaController.processOffer(rtpEndpoint.id, rtpSdpOffer, function(error, rtpSdpAnswer) { + if (error) { + console.log(" [rtpendpoint] processOffer error => " + error + " for SDP " + rtpSdpOffer); + //pipeline.release(); + return _callback(error); + } + + console.log(" [rtpendpoint] KMS answer SDP => " + rtpSdpAnswer); + let recvVideoPort = rtpSdpAnswer.match(/m=video\s(\d*)/)[1]; + let rtpParams = MediaHandler.generateTranscoderParams(kurentoIp, localIpAddress, + sendVideoPort, recvVideoPort, self._meetingId, "stream_type_video", C.RTP_TO_RTMP, "copy", "caller"); + + self._ffmpegRtpEndpoint.on('MediaFlowInStateChange', function(event) { + if (event.state === 'NOT_FLOWING') { + self._onRtpMediaNotFlowing(); + } + else if (event.state === 'FLOWING') { + self._onRtpMediaFlowing(self._meetingId, rtpParams); + } + }); + return _callback(null, webRtcSdpAnswer); + }); + }); + }); + }); + }); + }); + }; + + _stop() { + + console.log(' [stop] Releasing endpoints for ' + this._id); + + this._stopScreensharing(); + + if (this._presenterEndpoint) { + MediaController.releaseMediaElement(this._presenterEndpoint.id); + this._presenterEndpoint = null; + } else { + console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); + } + + if (this._ffmpegRtpEndpoint) { + MediaController.releaseMediaElement(this._ffmpegRtpEndpoint.id); + this._ffmpegRtpEndpoint = null; + } else { + console.log(" [rtpEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); + } + + console.log(' [stop] Screen is shared, releasing ' + this._id); + + delete sharedScreens[this._id]; + + delete this._candidatesQueue; + }; + + _stopScreensharing() { + let self = this; + let strm = Messaging.generateStopTranscoderRequestMessage(this._meetingId, this._meetingId); + + self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN); + + // Interoperability: capturing 1.1 stop_transcoder_reply messages + self._BigBlueButtonGW.once(C.STOP_TRANSCODER_REPLY, function(payload) { + let meetingId = payload[C.MEETING_ID]; + self._stopRtmpBroadcast(meetingId); + }); + + // Capturing stop transcoder responses from the 2x model + self._BigBlueButtonGW.once(C.STOP_TRANSCODER_RESP_2x, function(payload) { + let meetingId = payload[C.MEETING_ID_2x]; + self._stopRtmpBroadcast(meetingId); + }); + + } + + _onRtpMediaFlowing(meetingId, rtpParams) { + let self = this; + let strm = Messaging.generateStartTranscoderRequestMessage(meetingId, meetingId, rtpParams); + + // Interoperability: capturing 1.1 start_transcoder_reply messages + self._BigBlueButtonGW.once(C.START_TRANSCODER_REPLY, function(payload) { + let meetingId = payload[C.MEETING_ID]; + let output = payload["params"].output; + self._startRtmpBroadcast(meetingId, output); + }); + + // Capturing stop transcoder responses from the 2x model + self._BigBlueButtonGW.once(C.START_TRANSCODER_RESP_2x, function(payload) { + let meetingId = payload[C.MEETING_ID_2x]; + let output = payload["params"].output; + self._startRtmpBroadcast(meetingId, output); + }); + + + self._BigBlueButtonGW.publish(strm, C.TO_BBB_TRANSCODE_SYSTEM_CHAN); + }; + + _stopRtmpBroadcast (meetingId) { + var self = this; + if(self._meetingId === meetingId) { + // TODO correctly assemble this timestamp + let timestamp = now.format('hhmmss'); + let dsrstom = Messaging.generateScreenshareRTMPBroadcastStoppedEvent2x(self._voiceBridge, + self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp); + self._BigBlueButtonGW.publish(dsrstom, C.FROM_VOICE_CONF_SYSTEM_CHAN); + } + } + + _startRtmpBroadcast (meetingId, output) { + var self = this; + if(self._meetingId === meetingId) { + // TODO correctly assemble this timestamp + let timestamp = now.format('hhmmss'); + self._streamUrl = MediaHandler.generateStreamUrl(localIpAddress, meetingId, output); + let dsrbstam = Messaging.generateScreenshareRTMPBroadcastStartedEvent2x(self._voiceBridge, + self._voiceBridge, self._streamUrl, self._vw, self._vh, timestamp); + + self._BigBlueButtonGW.publish(dsrbstam, C.FROM_VOICE_CONF_SYSTEM_CHAN); + } + } + + _onRtpMediaNotFlowing() { + console.log(" [screenshare] TODO RTP NOT_FLOWING"); + }; +}; diff --git a/labs/node-bbb-apps/lib/video/VideoManager.js b/labs/node-bbb-apps/lib/video/VideoManager.js new file mode 100755 index 0000000000000000000000000000000000000000..5bd40e44e9ca44704d12588ab9ba01a97a4bcc44 --- /dev/null +++ b/labs/node-bbb-apps/lib/video/VideoManager.js @@ -0,0 +1,159 @@ +/* + * Lucas Fialho Zawacki + * (C) Copyright 2017 Bigbluebutton + * + */ + +var cookieParser = require('cookie-parser') +var express = require('express'); +var session = require('express-session') +var ws = require('./websocket'); +var http = require('http'); +var fs = require('fs'); + +var Video = require('./video'); + +// Global variables +var app = express(); +var sessions = {}; + +/* + * Management of sessions + */ +app.use(cookieParser()); + +var sessionHandler = session({ + secret : 'Shawarma', rolling : true, resave : true, saveUninitialized : true +}); + +app.use(sessionHandler); + +/* + * Server startup + */ +var server = http.createServer(app).listen(3002, function() { + console.log(' [*] Running bbb-html5 kurento video service.'); +}); + +var wss = new ws.Server({ + server : server, + path : '/html5video' +}); + +var clientId = 0; + +wss.on('connection', function(ws) { + var sessionId; + var request = ws.upgradeReq; + var response = { + writeHead : {} + }; + + sessionHandler(request, response, function(err) { + sessionId = request.session.id + "_" + clientId++; + + if (!sessions[sessionId]) { + sessions[sessionId] = {}; + } + + console.log('Connection received with sessionId ' + sessionId); + }); + + ws.on('error', function(error) { + console.log('Connection ' + sessionId + ' error'); + // stop(sessionId); + }); + + ws.on('close', function() { + console.log('Connection ' + sessionId + ' closed'); + stopSession(sessionId); + }); + + ws.on('message', function(_message) { + var message = JSON.parse(_message); + + var video; + if (message.cameraId && sessions[sessionId][message.cameraId]) { + video = sessions[sessionId][message.cameraId]; + } + + switch (message.id) { + + case 'start': + + console.log('[' + message.id + '] connection ' + sessionId); + + var video = new Video(ws, message.cameraId, message.cameraShared); + sessions[sessionId][message.cameraId] = video; + + video.start(message.sdpOffer, function(error, sdpAnswer) { + if (error) { + return ws.sendMessage({id : 'error', message : error }); + } + + ws.sendMessage({id : 'startResponse', cameraId: message.cameraId, sdpAnswer : sdpAnswer}); + }); + + break; + + case 'stop': + + console.log('[' + message.id + '] connection ' + sessionId); + + if (video) { + video.stop(sessionId); + } else { + console.log(" [stop] Why is there no video on STOP?"); + } + break; + + case 'onIceCandidate': + + if (video) { + video.onIceCandidate(message.candidate); + } else { + console.log(" [iceCandidate] Why is there no video on ICE CANDIDATE?"); + } + break; + + default: + ws.sendMessage({ id : 'error', message : 'Invalid message ' + message }); + break; + } + + }); +}); + +var stopSession = function(sessionId) { + + console.log(' [>] Stopping session ' + sessionId); + + var videoIds = Object.keys(sessions[sessionId]); + + for (var i = 0; i < videoIds.length; i++) { + + var video = sessions[sessionId][videoIds[i]]; + video.stop(); + + delete sessions[sessionId][videoIds[i]]; + } + + delete sessions[sessionId]; +} + +var stopAll = function() { + + console.log('\n [x] Stopping everything! '); + + var sessionIds = Object.keys(sessions); + + for (var i = 0; i < sessionIds.length; i++) { + + stopSession(sessionIds[i]); + } + + setTimeout(process.exit, 1000); +} + +process.on('SIGTERM', stopAll); +process.on('SIGINT', stopAll); diff --git a/labs/node-bbb-apps/lib/video/VideoProcess.js b/labs/node-bbb-apps/lib/video/VideoProcess.js new file mode 100644 index 0000000000000000000000000000000000000000..af1f3b35daa3ad3748175c80254626e2bfe6f113 --- /dev/null +++ b/labs/node-bbb-apps/lib/video/VideoProcess.js @@ -0,0 +1,10 @@ +const VideoManager = require('./VideoManager'); + +process.on('uncaughtException', function (error) { + console.log(error.stack); +}); + +process.on('disconnect',function() { + console.log("Parent exited!"); + process.kill(); +}); diff --git a/labs/node-bbb-apps/lib/video/video.js b/labs/node-bbb-apps/lib/video/video.js new file mode 100644 index 0000000000000000000000000000000000000000..f2c225382b4c8c16958dffcb78a190ba9a0ad43c --- /dev/null +++ b/labs/node-bbb-apps/lib/video/video.js @@ -0,0 +1,232 @@ +// Global stuff +var mediaPipelines = {}; +var sharedWebcams = {}; + +// TODO Later +// var loadBalancer = require('') +const kurento = require('kurento-client'); +const config = require('config'); +const kurentoUrl = config.get('kurentoUrl'); + +if (config.get('acceptSelfSignedCertificate')) { + process.env.NODE_TLS_REJECT_UNAUTHORIZED=0; +} + +var kurentoClient = null; + +function getKurentoClient(callback) { + + if (kurentoClient !== null) { + return callback(null, kurentoClient); + } + + kurento(kurentoUrl, function(error, _kurentoClient) { + if (error) { + console.log("Could not find media server at address " + kurentoUrl); + return callback("Could not find media server at address" + kurentoUrl + ". Exiting with error " + error); + } + + console.log(" [server] Initiating kurento client. Connecting to: " + kurentoUrl); + + kurentoClient = _kurentoClient; + callback(null, kurentoClient); + }); +} + +function getMediaPipeline(id, callback) { + + console.log(' [media] Creating media pipeline for ' + id); + + if (mediaPipelines[id]) { + + console.log(' [media] Pipeline already exists.'); + + callback(null, mediaPipelines[id]); + + } else { + + kurentoClient.create('MediaPipeline', function(err, pipeline) { + + mediaPipelines[id] = pipeline; + + return callback(err, pipeline); + }); + + } + +} + +function Video(_ws, _id, _shared) { + + var ws = _ws; + var id = _id; + var shared = _shared; + var webRtcEndpoint = null; + + var candidatesQueue = []; + + this.onIceCandidate = function(_candidate) { + var candidate = kurento.getComplexType('IceCandidate')(_candidate); + + if (webRtcEndpoint) { + webRtcEndpoint.addIceCandidate(candidate); + } + else { + candidatesQueue.push(candidate); + } + }; + + this.start = function(sdpOffer, callback) { + + getKurentoClient(function(error, kurentoClient) { + + if (error) { + return callback(error); + } + + getMediaPipeline(id, function(error, pipeline) { + + if (error) { + return callback(error); + } + + createMediaElements(pipeline, function(error, _webRtcEndpoint) { + + if (error) { + pipeline.release(); + return callback(error); + } + + while(candidatesQueue.length) { + var candidate = candidatesQueue.shift(); + _webRtcEndpoint.addIceCandidate(candidate); + } + + var flowInOut = function(event) { + console.log(' [=] ' + event.type + ' for endpoint ' + id); + + if (event.state === 'NOT_FLOWING') { + ws.sendMessage({ id : 'playStop', cameraId : id }); + } else if (event.state === 'FLOWING') { + ws.sendMessage({ id : 'playStart', cameraId : id }); + } + }; + + _webRtcEndpoint.on('MediaFlowInStateChange', flowInOut); + _webRtcEndpoint.on('MediaFlowOutStateChange', flowInOut); + + connectMediaElements(_webRtcEndpoint, function(error) { + + if (error) { + pipeline.release(); + return callback(error); + } + + // It's a user sharing a webcam + if (shared) { + sharedWebcams[id] = _webRtcEndpoint; + } + + // Store our endpoint + webRtcEndpoint = _webRtcEndpoint; + + _webRtcEndpoint.on('OnIceCandidate', function(event) { + var candidate = kurento.getComplexType('IceCandidate')(event.candidate); + ws.sendMessage({ id : 'iceCandidate', cameraId: id, candidate : candidate }); + }); + + _webRtcEndpoint.processOffer(sdpOffer, function(error, sdpAnswer) { + if (error) { + pipeline.release(); + return callback(error); + } + + return callback(null, sdpAnswer); + }); + + _webRtcEndpoint.gatherCandidates(function(error) { + if (error) { + return callback(error); + } + }); + }); + }); + }); + }); + }; + + var createMediaElements = function(pipeline, callback) { + + console.log(" [webrtc] Creating webrtc endpoint"); + + pipeline.create('WebRtcEndpoint', function(error, _webRtcEndpoint) { + + if (error) { + return callback(error); + } + + webRtcEndpoint = _webRtcEndpoint; + + return callback(null, _webRtcEndpoint); + }); + }; + + var connectMediaElements = function(webRtcEndpoint, callback) { + + // User is sharing webcam (sendOnly connection from the client) + if (shared) { + console.log(" [webrtc] User has shared the webcam, no connection needed"); + // Dont connect this, just create the webrtcEndpoint + // webRtcEndpoint.connect(webRtcEndpoint, callback); + + return callback(null); + } else { + + console.log(" [webrtc] User wants to receive webcam "); + + if (sharedWebcams[id]) { + var wRtc = sharedWebcams[id]; + + wRtc.connect(webRtcEndpoint, function(error) { + + if (error) { + return callback(error); + } + return callback(null); + }); + } + + } + }; + + this.stop = function() { + + console.log(' [stop] Releasing webrtc endpoint for ' + id); + + if (webRtcEndpoint) { + webRtcEndpoint.release(); + webRtcEndpoint = null; + } else { + console.log(" [webRtcEndpoint] PLEASE DONT TRY STOPPING THINGS TWICE"); + } + + if (shared) { + console.log(' [stop] Webcam is shared, releasing ' + id); + + if (mediaPipelines[id]) { + mediaPipelines[id].release(); + } else { + console.log(" [mediaPipeline] PLEASE DONT TRY STOPPING THINGS TWICE"); + } + + delete mediaPipelines[id]; + delete sharedWebcams[id]; + } + + delete candidatesQueue; + }; + + return this; +}; + +module.exports = Video; \ No newline at end of file diff --git a/labs/node-bbb-apps/lib/video/websocket.js b/labs/node-bbb-apps/lib/video/websocket.js new file mode 100644 index 0000000000000000000000000000000000000000..c4fe9f6f18b220e8b4c43be58ec75004a6430124 --- /dev/null +++ b/labs/node-bbb-apps/lib/video/websocket.js @@ -0,0 +1,18 @@ +/* + * Simple wrapper around the ws library + * + */ + +var ws = require('ws'); + +ws.prototype.sendMessage = function(json) { + + return this.send(JSON.stringify(json), function(error) { + if(error) + console.log(' [server] Websocket error "' + error + '" on message "' + json.id + '"'); + }); + +}; + + +module.exports = ws; \ No newline at end of file diff --git a/labs/node-bbb-apps/lib/websocket.js b/labs/node-bbb-apps/lib/websocket.js new file mode 100644 index 0000000000000000000000000000000000000000..c4fe9f6f18b220e8b4c43be58ec75004a6430124 --- /dev/null +++ b/labs/node-bbb-apps/lib/websocket.js @@ -0,0 +1,18 @@ +/* + * Simple wrapper around the ws library + * + */ + +var ws = require('ws'); + +ws.prototype.sendMessage = function(json) { + + return this.send(JSON.stringify(json), function(error) { + if(error) + console.log(' [server] Websocket error "' + error + '" on message "' + json.id + '"'); + }); + +}; + + +module.exports = ws; \ No newline at end of file diff --git a/labs/node-bbb-apps/server.js b/labs/node-bbb-apps/server.js new file mode 100755 index 0000000000000000000000000000000000000000..ccdcdecca518400506a6ba56c7e83b555343f704 --- /dev/null +++ b/labs/node-bbb-apps/server.js @@ -0,0 +1,68 @@ +/* + * Lucas Fialho Zawacki + * Paulo Renato Lanzarin + * (C) Copyright 2017 Bigbluebutton + * + */ + +const ConnectionManager = require('./lib/connection-manager/ConnectionManager'); +const HttpServer = require('./lib/connection-manager/HttpServer'); +const server = new HttpServer(); +const WebsocketConnectionManager = require('./lib/connection-manager/WebsocketConnectionManager'); +const cp = require('child_process'); + +let screenshareProc = cp.fork('./lib/screenshare/ScreenshareProcess', { + // Pass over all of the environment. + env: process.ENV, + // Share stdout/stderr, so we can hear the inevitable errors. + silent: false +}); + +let videoProc = cp.fork('./lib/video/VideoProcess.js', { + // Pass over all of the environment. + env: process.ENV, + // Share stdout/stderr, so we can hear the inevitable errors. + silent: false +}); + +let onMessage = function (message) { + console.log('event','child message',this.pid,message); +}; + +let onError = function(e) { + console.log('event','child error',this.pid,e); +}; + +let onDisconnect = function(e) { + console.log(e); + console.log('event','child disconnect',this.pid,'killing...'); + this.kill(); +}; + +screenshareProc.on('message',onMessage); +screenshareProc.on('error',onError); +screenshareProc.on('disconnect',onDisconnect); + +videoProc.on('message',onMessage); +videoProc.on('error',onError); +videoProc.on('disconnect',onDisconnect); + +const CM = new ConnectionManager(screenshareProc, videoProc); + +let websocketManager = new WebsocketConnectionManager(server.getServerObject(), '/kurento-screenshare'); + +process.on('SIGTERM', process.exit) +process.on('SIGINT', process.exit) +process.on('uncaughtException', function (error) { + console.log(error.stack); +}); + + +CM.setHttpServer(server); +CM.addAdapter(websocketManager); + +CM.listen(() => { + console.log(" [SERVER] Server started"); +}); + +