diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js index 9a8e7b854a08e041402ec2fead37e8aaf94ddc7b..cb3bf47e2915467b9aa6d34edc4c66a2d38f68ea 100644 --- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js +++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js @@ -134,20 +134,25 @@ module.exports = class MediaServer extends EventEmitter { _getMediaPipeline (roomId) { return new Promise((resolve, reject) => { try { + let mediaServer = this._getClientFromPool(); if (this._mediaPipelines[roomId]) { - Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.'); - return resolve(this._mediaPipelines[roomId]); + mediaServer.getMediaobjectById(this._mediaPipelines[roomId].id, (error, pipeline) => { + if (error) { + return reject(this._handleError(error)); + } + Logger.warn('[mcs-media] Pipeline for', roomId, 'already exists.', pipeline.id); + return resolve(pipeline); + }); + } else { + mediaServer.create('MediaPipeline', (error, pipeline) => { + if (error) { + return reject(this._handleError(error)); + } + this._mediaPipelines[roomId] = pipeline; + pipeline.activeElements = {}; + resolve(pipeline); + }); } - - let mediaServer = this._getClientFromPool(); - mediaServer.create('MediaPipeline', (error, pipeline) => { - if (error) { - return reject(this._handleError(error)); - } - this._mediaPipelines[roomId] = pipeline; - pipeline.activeElements = 0; - resolve(pipeline); - }); } catch (err) { return reject(this._handleError(err)); @@ -156,7 +161,7 @@ module.exports = class MediaServer extends EventEmitter { } _releasePipeline (room) { - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { try { Logger.debug("[mcs-media] Releasing room", room, "pipeline"); const pipeline = this._mediaPipelines[room]; @@ -204,7 +209,7 @@ module.exports = class MediaServer extends EventEmitter { Logger.debug("[mcs-media] Creating element with keyframe interval set to", options.keyframeInterval); mediaElement.setKeyframeInterval(options.keyframeInterval); } - this._mediaPipelines[roomId].activeElements++; + this._mediaPipelines[roomId].activeElements[mediaElement.id] = mediaElement.id; resolve(mediaElement.id); } catch (err) { @@ -213,13 +218,32 @@ module.exports = class MediaServer extends EventEmitter { }); } - async startRecording (sourceId) { - const source = this._mediaElements[sourceId]; + _getMediaElement (elementId) { return new Promise((resolve, reject) => { - if (source == null) { - return reject(this._handleError(ERRORS[40101])); + try { + let mediaServer = this._getClientFromPool(); + if (this._mediaElements[elementId]) { + mediaServer.getMediaobjectById(elementId, (error, element) => { + if (error) { + return reject(this._handleError(error)); + } + Logger.warn('[mcs-media] Element', elementId, 'found'); + return resolve(element); + }); + } else { + return reject(this._handleError(ERRORS[40101])); + } + } + catch (err) { + return reject(this._handleError(err)); } + }); + } + + async startRecording (sourceId) { + return new Promise(async (resolve, reject) => { try { + const source = await this._getMediaElement(sourceId); source.record((err) => { if (err) { return reject(this._handleError(err)); @@ -234,13 +258,9 @@ module.exports = class MediaServer extends EventEmitter { } async _stopRecording (sourceId) { - const source = this._mediaElements[sourceId]; - - return new Promise((resolve, reject) => { - if (source == null) { - return reject(this._handleError(ERRORS[40101])); - } + return new Promise(async (resolve, reject) => { try { + const source = await this._getMediaElement(sourceId); source.stopAndWait((err) => { if (err) { return reject(this._handleError(err)); @@ -255,14 +275,11 @@ module.exports = class MediaServer extends EventEmitter { } async connect (sourceId, sinkId, type) { - const source = this._mediaElements[sourceId]; - const sink = this._mediaElements[sinkId]; - - return new Promise((resolve, reject) => { - if (source == null || sink == null) { - return reject(this._handleError(ERRORS[40101])); - } + return new Promise(async (resolve, reject) => { try { + const source = await this._getMediaElement(sourceId); + const sink = await this._getMediaElement(sinkId); + switch (type) { case 'ALL': source.connect(sink, (error) => { @@ -302,14 +319,11 @@ module.exports = class MediaServer extends EventEmitter { } async disconnect (sourceId, sinkId, type) { - const source = this._mediaElements[sourceId]; - const sink = this._mediaElements[sinkId]; - - return new Promise((resolve, reject) => { - if (source == null || sink == null) { - return reject(this._handleError(ERRORS[40101])); - } + return new Promise(async (resolve, reject) => { try { + const source = await this._getMediaElement(sourceId); + const sink = await this._getMediaElement(sinkId); + switch (type) { case 'ALL': source.disconnect(sink, (error) => { @@ -348,11 +362,26 @@ module.exports = class MediaServer extends EventEmitter { } stop (room, type, elementId) { + const pipeline = this._mediaPipelines[room]; + const cleanPipeline = async (p) => { + if (p) { + if (p.activeElements[elementId]) { + delete p.activeElements[elementId]; + } + + const activeElements = Object.keys(p.activeElements).length; + + Logger.info("[mcs-media] Pipeline has a total of", activeElements, "active elements"); + if (activeElements <= 0) { + await this._releasePipeline(room); + } + } + } + return new Promise(async (resolve, reject) => { try { Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room); - const mediaElement = this._mediaElements[elementId]; - const pipeline = this._mediaPipelines[room]; + const mediaElement = await this._getMediaElement(elementId); if (type === 'RecorderEndpoint') { await this._stopRecording(elementId); @@ -363,26 +392,23 @@ module.exports = class MediaServer extends EventEmitter { if (error) { return reject(this._handleError(error)); } - delete this._mediaElements[elementId]; - if (pipeline) { - pipeline.activeElements--; + cleanPipeline(pipeline); - Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements"); - if (pipeline.activeElements <= 0) { - await this._releasePipeline(room); - } - } return resolve(); }); } else { Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop"); + + cleanPipeline(pipeline); + return resolve(); } } catch (err) { + cleanPipeline(pipeline); this._handleError(err); resolve(); } @@ -390,9 +416,10 @@ module.exports = class MediaServer extends EventEmitter { } addIceCandidate (elementId, candidate) { - return new Promise((resolve, reject) => { - const mediaElement = this._mediaElements[elementId]; + return new Promise(async (resolve, reject) => { try { + const mediaElement = await this._getMediaElement(elementId); + if (mediaElement && candidate) { mediaElement.addIceCandidate(candidate, (error) => { if (error) { @@ -414,13 +441,10 @@ module.exports = class MediaServer extends EventEmitter { gatherCandidates (elementId) { Logger.info('[mcs-media] Gathering ICE candidates for ' + elementId); - const mediaElement = this._mediaElements[elementId]; - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { try { - if (mediaElement == null) { - return reject(this._handleError(ERRORS[40101])); - } + const mediaElement = await this._getMediaElement(elementId); mediaElement.gatherCandidates((error) => { if (error) { return reject(this._handleError(error)); @@ -469,10 +493,10 @@ module.exports = class MediaServer extends EventEmitter { } processOffer (elementId, sdpOffer) { - const mediaElement = this._mediaElements[elementId]; - - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { try { + const mediaElement = await this._getMediaElement(elementId); + if (mediaElement) { mediaElement.processOffer(sdpOffer, (error, answer) => { if (error) {