From 16bfcb9f198b7ac63ca9e5779cd0ff3a2091ebe0 Mon Sep 17 00:00:00 2001
From: prlanzarin <prlanzarin@inf.ufrgs.br>
Date: Tue, 9 Oct 2018 22:12:38 +0000
Subject: [PATCH] Making further use of the SFU-KMS websocket pool by making
 the kurento adapter a bit more stateless

---
 .../mcs-core/lib/adapters/kurento/kurento.js  | 140 ++++++++++--------
 1 file changed, 82 insertions(+), 58 deletions(-)

diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
index 9a8e7b854a..cb3bf47e29 100644
--- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
+++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
@@ -134,20 +134,25 @@ module.exports = class MediaServer extends EventEmitter {
   _getMediaPipeline (roomId) {
     return new Promise((resolve, reject) => {
       try {
+        let mediaServer = this._getClientFromPool();
         if (this._mediaPipelines[roomId]) {
-          Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.');
-          return resolve(this._mediaPipelines[roomId]);
+          mediaServer.getMediaobjectById(this._mediaPipelines[roomId].id, (error, pipeline) => {
+            if (error) {
+              return reject(this._handleError(error));
+            }
+            Logger.warn('[mcs-media] Pipeline for', roomId, 'already exists.', pipeline.id);
+            return resolve(pipeline);
+          });
+        } else {
+          mediaServer.create('MediaPipeline', (error, pipeline) => {
+            if (error) {
+              return reject(this._handleError(error));
+            }
+            this._mediaPipelines[roomId] = pipeline;
+            pipeline.activeElements = {};
+            resolve(pipeline);
+          });
         }
-
-        let mediaServer = this._getClientFromPool();
-        mediaServer.create('MediaPipeline', (error, pipeline) => {
-          if (error) {
-            return reject(this._handleError(error));
-          }
-          this._mediaPipelines[roomId] = pipeline;
-          pipeline.activeElements = 0;
-          resolve(pipeline);
-        });
       }
       catch (err) {
         return reject(this._handleError(err));
@@ -156,7 +161,7 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   _releasePipeline (room) {
-    return new Promise((resolve, reject) => {
+    return new Promise(async (resolve, reject) => {
       try {
         Logger.debug("[mcs-media] Releasing room", room, "pipeline");
         const pipeline = this._mediaPipelines[room];
@@ -204,7 +209,7 @@ module.exports = class MediaServer extends EventEmitter {
           Logger.debug("[mcs-media] Creating element with keyframe interval set to", options.keyframeInterval);
           mediaElement.setKeyframeInterval(options.keyframeInterval);
         }
-        this._mediaPipelines[roomId].activeElements++;
+        this._mediaPipelines[roomId].activeElements[mediaElement.id] = mediaElement.id;
         resolve(mediaElement.id);
       }
       catch (err) {
@@ -213,13 +218,32 @@ module.exports = class MediaServer extends EventEmitter {
     });
   }
 
-  async startRecording (sourceId) {
-    const source = this._mediaElements[sourceId];
+  _getMediaElement (elementId) {
     return new Promise((resolve, reject) => {
-      if (source == null) {
-        return reject(this._handleError(ERRORS[40101]));
+      try {
+        let mediaServer = this._getClientFromPool();
+        if (this._mediaElements[elementId]) {
+          mediaServer.getMediaobjectById(elementId, (error, element) => {
+            if (error) {
+              return reject(this._handleError(error));
+            }
+            Logger.warn('[mcs-media] Element', elementId, 'found');
+            return resolve(element);
+          });
+        } else {
+          return reject(this._handleError(ERRORS[40101]));
+        }
+      }
+      catch (err) {
+        return reject(this._handleError(err));
       }
+    });
+  }
+
+  async startRecording (sourceId) {
+    return new Promise(async (resolve, reject) => {
       try {
+        const source = await this._getMediaElement(sourceId);
         source.record((err) => {
           if (err) {
             return reject(this._handleError(err));
@@ -234,13 +258,9 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   async _stopRecording (sourceId) {
-    const source = this._mediaElements[sourceId];
-
-    return new Promise((resolve, reject) => {
-      if (source == null) {
-        return reject(this._handleError(ERRORS[40101]));
-      }
+    return new Promise(async (resolve, reject) => {
       try {
+        const source = await this._getMediaElement(sourceId);
         source.stopAndWait((err) => {
           if (err) {
             return reject(this._handleError(err));
@@ -255,14 +275,11 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   async connect (sourceId, sinkId, type) {
-    const source = this._mediaElements[sourceId];
-    const sink = this._mediaElements[sinkId];
-
-    return new Promise((resolve, reject) => {
-      if (source == null || sink == null) {
-        return reject(this._handleError(ERRORS[40101]));
-      }
+    return new Promise(async (resolve, reject) => {
       try {
+        const source = await this._getMediaElement(sourceId);
+        const sink = await this._getMediaElement(sinkId);
+
         switch (type) {
           case 'ALL':
             source.connect(sink, (error) => {
@@ -302,14 +319,11 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   async disconnect (sourceId, sinkId, type) {
-    const source = this._mediaElements[sourceId];
-    const sink = this._mediaElements[sinkId];
-
-    return new Promise((resolve, reject) => {
-      if (source == null || sink == null) {
-        return reject(this._handleError(ERRORS[40101]));
-      }
+    return new Promise(async (resolve, reject) => {
       try {
+        const source = await this._getMediaElement(sourceId);
+        const sink = await this._getMediaElement(sinkId);
+
         switch (type) {
           case 'ALL':
             source.disconnect(sink, (error) => {
@@ -348,11 +362,26 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   stop (room, type, elementId) {
+    const pipeline = this._mediaPipelines[room];
+    const cleanPipeline = async (p) => {
+      if (p) {
+        if (p.activeElements[elementId]) {
+          delete p.activeElements[elementId];
+        }
+
+        const activeElements = Object.keys(p.activeElements).length;
+
+        Logger.info("[mcs-media] Pipeline has a total of", activeElements, "active elements");
+        if (activeElements <= 0) {
+          await this._releasePipeline(room);
+        }
+      }
+    }
+
     return new Promise(async (resolve, reject) => {
       try {
         Logger.info("[mcs-media] Releasing endpoint", elementId, "from room", room);
-        const mediaElement = this._mediaElements[elementId];
-        const pipeline = this._mediaPipelines[room];
+        const mediaElement = await this._getMediaElement(elementId);
 
         if (type === 'RecorderEndpoint') {
           await this._stopRecording(elementId);
@@ -363,26 +392,23 @@ module.exports = class MediaServer extends EventEmitter {
             if (error) {
               return reject(this._handleError(error));
             }
-
             delete this._mediaElements[elementId];
 
-            if (pipeline) {
-              pipeline.activeElements--;
+            cleanPipeline(pipeline);
 
-              Logger.info("[mcs-media] Pipeline has a total of", pipeline.activeElements, "active elements");
-              if (pipeline.activeElements <= 0) {
-                await this._releasePipeline(room);
-              }
-            }
             return resolve();
           });
         }
         else {
           Logger.warn("[mcs-media] Media element", elementId, "could not be found to stop");
+
+          cleanPipeline(pipeline);
+
           return resolve();
         }
       }
       catch (err) {
+        cleanPipeline(pipeline);
         this._handleError(err);
         resolve();
       }
@@ -390,9 +416,10 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   addIceCandidate (elementId, candidate) {
-    return new Promise((resolve, reject) => {
-      const mediaElement = this._mediaElements[elementId];
+    return new Promise(async (resolve, reject) => {
       try {
+        const mediaElement = await this._getMediaElement(elementId);
+
         if (mediaElement  && candidate) {
           mediaElement.addIceCandidate(candidate, (error) => {
             if (error) {
@@ -414,13 +441,10 @@ module.exports = class MediaServer extends EventEmitter {
 
   gatherCandidates (elementId) {
     Logger.info('[mcs-media] Gathering ICE candidates for ' + elementId);
-    const mediaElement = this._mediaElements[elementId];
 
-    return new Promise((resolve, reject) => {
+    return new Promise(async (resolve, reject) => {
       try {
-        if (mediaElement == null) {
-          return reject(this._handleError(ERRORS[40101]));
-        }
+        const mediaElement = await this._getMediaElement(elementId);
         mediaElement.gatherCandidates((error) => {
           if (error) {
             return reject(this._handleError(error));
@@ -469,10 +493,10 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   processOffer (elementId, sdpOffer) {
-    const mediaElement = this._mediaElements[elementId];
-
-    return new Promise((resolve, reject) => {
+    return new Promise(async (resolve, reject) => {
       try {
+        const mediaElement = await this._getMediaElement(elementId);
+
         if (mediaElement) {
           mediaElement.processOffer(sdpOffer, (error, answer) => {
             if (error) {
-- 
GitLab