From a7da78086f5c8aeac8545fa9aecf2d46ece86fb8 Mon Sep 17 00:00:00 2001
From: prlanzarin <prlanzarin@inf.ufrgs.br>
Date: Tue, 9 Oct 2018 16:59:42 +0000
Subject: [PATCH] Make SFU connect to Kurento using multiple websockets

---
 .../mcs-core/lib/adapters/kurento/kurento.js  | 68 ++++++++++++++-----
 1 file changed, 51 insertions(+), 17 deletions(-)

diff --git a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
index c5bde42778..9a8e7b854a 100644
--- a/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
+++ b/labs/bbb-webrtc-sfu/lib/mcs-core/lib/adapters/kurento/kurento.js
@@ -7,6 +7,7 @@ const EventEmitter = require('events').EventEmitter;
 const Logger = require('../../../../utils/Logger');
 const isError = require('../../utils/util').isError;
 const ERRORS = require('./errors.js');
+const KURENTO_WEBSOCKET_POOL_SIZE = config.get('kurento-websocket-pool-size');
 
 let instance = null;
 
@@ -17,8 +18,8 @@ module.exports = class MediaServer extends EventEmitter {
       this._serverUri = serverUri;
       this._mediaPipelines = {};
       this._mediaElements = {};
-      this._mediaServer;
-      this._status;
+      this._mediaServers;
+      this._status = C.STATUS.STOPPED;
       this._reconnectionRoutine = null;
       instance = this;
     }
@@ -29,9 +30,14 @@ module.exports = class MediaServer extends EventEmitter {
   init () {
     return new Promise(async (resolve, reject) => {
       try {
-        if (!this._mediaServer) {
-          this._mediaServer = await this._getMediaServerClient(this._serverUri);
-          Logger.info("[mcs-media] Retrieved media server client => " + this._mediaServer);
+        if (this._mediaServers == null || this._mediaServers.length === 0) {
+          this._mediaServers = [];
+          for (var i = 0; i < KURENTO_WEBSOCKET_POOL_SIZE; i++) {
+            let client = await this._getMediaServerClient(this._serverUri);
+            this._mediaServers.push(client);
+          }
+          Logger.info("[mcs-media] Retrieved", this._mediaServers.length, "media server clients");
+          this._status = C.STATUS.STARTING;
           this._monitorConnectionState();
           return resolve();
         }
@@ -56,10 +62,14 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   _monitorConnectionState () {
-    Logger.debug('[mcs-media] Monitoring connection state');
     try {
-      this._mediaServer.on('disconnect', this._onDisconnection.bind(this));
-      this._mediaServer.on('reconnected',this._onReconnection.bind(this));
+      if (this._mediaServers) {
+        Logger.debug('[mcs-media] Monitoring connection state');
+        this._mediaServers.forEach(ms => {
+          ms.on('disconnect', this._onDisconnection.bind(this));
+          ms.on('reconnected',this._onReconnection.bind(this));
+        });
+       }
     }
     catch (err) {
       this._handleError(err);
@@ -67,37 +77,59 @@ module.exports = class MediaServer extends EventEmitter {
   }
 
   _onDisconnection () {
-    Logger.error('[mcs-media] Media server was disconnected for some reason, will have to clean up all elements and notify users');
-    this._destroyElements();
-    this._destroyMediaServer();
-    this.emit(C.ERROR.MEDIA_SERVER_OFFLINE);
-    this._reconnectToServer();
+    if (this._status !== C.STATUS.STOPPED) {
+      Logger.error('[mcs-media] Media server was disconnected for some reason, will have to clean up all elements and notify users');
+      this._status = C.STATUS.STOPPED;
+      this._destroyElements();
+      this._destroyMediaServer();
+      this.emit(C.ERROR.MEDIA_SERVER_OFFLINE);
+      this._reconnectToServer();
+    }
   }
 
   _onReconnection (sameSession) {
     if (!sameSession) {
+      if (this._status !== C.STATUS.STOPPED) {
+        this._status = C.STATUS.RESTARTING;
+        this._destroyElements();
+        this._destroyMediaServer();
+        this.emit(C.ERROR.MEDIA_SERVER_OFFLINE);
+      }
+
+      this._status = C.STATUS.STARTED;
       Logger.info('[mcs-media] Media server is back online');
       this.emit(C.EVENT.MEDIA_SERVER_ONLINE);
     }
   }
 
   _reconnectToServer () {
-    if (this._reconnectionRoutine == null) {
+    if (this._reconnectionRoutine == null && this._status !== C.STATUS.RESTARTING) {
+      this._status = C.STATUS.RESTARTING;
       this._reconnectionRoutine = setInterval(async () => {
         try {
-          this._mediaServer = await this._getMediaServerClient(this._serverUri);
+          for (var i = 0; i < KURENTO_WEBSOCKET_POOL_SIZE; i++) {
+            let client = await this._getMediaServerClient(this._serverUri);
+            this._mediaServers.push(client);
+          }
+          Logger.info("[mcs-media] Retrieved", this._mediaServers.length, "media server clients");
           this._monitorConnectionState();
           clearInterval(this._reconnectionRoutine);
           this._reconnectionRoutine = null;
           Logger.warn("[mcs-media] Reconnection to media server succeeded");
         }
         catch (error) {
-          delete this._mediaServer;
+          this._mediaServers = [];
         }
       }, 2000);
     }
   }
 
+  _getClientFromPool () {
+    // Round robin the pool
+    let client = this._mediaServers.shift();
+    this._mediaServers.push(client);
+    return client;
+  }
 
   _getMediaPipeline (roomId) {
     return new Promise((resolve, reject) => {
@@ -106,7 +138,9 @@ module.exports = class MediaServer extends EventEmitter {
           Logger.warn('[mcs-media] Pipeline for', roomId, ' already exists.');
           return resolve(this._mediaPipelines[roomId]);
         }
-        this._mediaServer.create('MediaPipeline', (error, pipeline) => {
+
+        let mediaServer = this._getClientFromPool();
+        mediaServer.create('MediaPipeline', (error, pipeline) => {
           if (error) {
             return reject(this._handleError(error));
           }
-- 
GitLab