From 114cba670f8241fa4b2e1aaed85c45f87c4849d8 Mon Sep 17 00:00:00 2001
From: Ghazi Triki <ghazi.triki@riadvice.tn>
Date: Thu, 6 Dec 2018 21:35:51 +0100
Subject: [PATCH] Improved redis connection logging.

---
 .../redis/Red5AppsRedisSubscriberActor.scala  |  4 +--
 .../common2/redis/RedisAwareCommunicator.java | 24 +++++++++++++++
 .../common2/redis/RedisStorageService.java    | 13 ++++++---
 .../common2/redis/pubsub/MessageReceiver.java |  2 ++
 .../common2/redis/pubsub/MessageSender.java   |  2 ++
 .../redis/RedisConnectionHandler.scala        | 29 +++++++++++++++++++
 .../common2/redis/RedisPublisher.scala        |  8 ++++-
 .../redis/RedisSubscriberProvider.scala       | 13 ++++-----
 .../redis/WebRedisSubscriberActor.scala       |  4 +--
 .../screenshare/server/util/LogHelper.scala   |  1 -
 10 files changed, 83 insertions(+), 17 deletions(-)
 create mode 100644 bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala

diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala
index e23985f145..0281a091f3 100644
--- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala
+++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala
@@ -39,9 +39,9 @@ class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromA
       }
       def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
       def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
-      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) }
+      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
       def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
-      def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) }
+      def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
     })
   }
 
diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java
index 1a8d5f0108..23dc26d26f 100644
--- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java
+++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java
@@ -20,13 +20,25 @@
 package org.bigbluebutton.common2.redis;
 
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
 
 import io.lettuce.core.RedisClient;
+import io.lettuce.core.event.Event;
+import io.lettuce.core.event.EventBus;
+import io.lettuce.core.event.connection.ConnectedEvent;
+import io.lettuce.core.event.connection.ConnectionActivatedEvent;
+import io.lettuce.core.event.connection.ConnectionDeactivatedEvent;
+import io.lettuce.core.event.connection.DisconnectedEvent;
+import reactor.core.Disposable;
 
 public abstract class RedisAwareCommunicator {
 
     protected RedisClient redisClient;
 
+    protected Disposable eventBusSubscription;
+
+    protected EventBus eventBus;
+
     protected String host;
     protected String password;
     protected int port;
@@ -41,6 +53,18 @@ public abstract class RedisAwareCommunicator {
         this.password = password;
     }
 
+    protected void connectionStatusHandler(Event event, Logger log) {
+        if (event instanceof ConnectedEvent) {
+            log.info("Connected to redis");
+        } else if (event instanceof ConnectionActivatedEvent) {
+            log.info("Connected to redis activated");
+        } else if (event instanceof DisconnectedEvent) {
+            log.info("Disconnected from redis");
+        } else if (event instanceof ConnectionDeactivatedEvent) {
+            log.info("Connected to redis deactivated");
+        }
+    }
+
     public void setClientName(String clientName) {
         this.clientName = clientName;
     }
diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java
index a419db0a5a..31dae83396 100644
--- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java
+++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java
@@ -43,11 +43,14 @@ public class RedisStorageService extends RedisAwareCommunicator {
 
         redisClient = RedisClient.create(redisUri);
         redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
+        eventBus = redisClient.getResources().eventBus();
+        eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
 
         connection = redisClient.connect();
     }
 
     public void stop() {
+        eventBusSubscription.dispose();
         connection.close();
         redisClient.shutdown();
         log.info("RedisStorageService Stopped");
@@ -93,15 +96,17 @@ public class RedisStorageService extends RedisAwareCommunicator {
         RedisCommands<String, String> commands = connection.sync();
 
         Long msgid = commands.incr("global:nextRecordedMsgId");
-        commands.hmset("recording:" + meetingId + ":" + msgid, event);
+        String key = "recording:" + meetingId + ":" + msgid;
+        commands.hmset(key, event);
         /**
          * We set the key to expire after 14 days as we are still recording the
          * event into redis even if the meeting is not recorded. (ralam sept 23,
          * 2015)
          */
-        commands.expire("meeting:" + meetingId + ":recordings", expireKey);
-        commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid));
-        commands.expire("meeting:" + meetingId + ":recordings", expireKey);
+        commands.expire(key, expireKey);
+        key = "meeting:" + meetingId + ":recordings";
+        commands.rpush(key, Long.toString(msgid));
+        commands.expire(key, expireKey);
         connection.close();
     }
 
diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java
index 6fd04a5fdc..ade4b5d648 100755
--- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java
+++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java
@@ -41,6 +41,8 @@ public class MessageReceiver extends RedisAwareCommunicator {
 
         redisClient = RedisClient.create(redisUri);
         redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
+        eventBus = redisClient.getResources().eventBus();
+        eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
 
         connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
                 createPoolingConfig());
diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java
index b35b54c335..8209f5bf76 100755
--- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java
+++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java
@@ -43,6 +43,8 @@ public class MessageSender extends RedisAwareCommunicator {
 
         redisClient = RedisClient.create(redisUri);
         redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
+        eventBus = redisClient.getResources().eventBus();
+        eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
 
         connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
                 createPoolingConfig());
diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala
new file mode 100644
index 0000000000..c30003c95d
--- /dev/null
+++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala
@@ -0,0 +1,29 @@
+package org.bigbluebutton.common2.redis
+
+import io.lettuce.core.RedisClient
+import io.lettuce.core.event.Event
+import io.lettuce.core.event.EventBus
+import io.lettuce.core.event.connection.{ ConnectionDeactivatedEvent, ConnectionActivatedEvent, ConnectedEvent, DisconnectedEvent }
+import reactor.core.Disposable
+import akka.event.LoggingAdapter
+
+trait RedisConnectionHandler {
+
+  def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) {
+    val eventBus: EventBus = redis.getResources().eventBus();
+    // @todo : unsubscribe when connection is closed
+    val eventBusSubscription: Disposable = eventBus.get().subscribe(e => connectionStatusHandler(e, log))
+  }
+
+  def connectionStatusHandler(event: Event, log: LoggingAdapter) {
+    if (event.isInstanceOf[ConnectedEvent]) {
+      log.info("Connected to redis");
+    } else if (event.isInstanceOf[ConnectionActivatedEvent]) {
+      log.info("Connection to redis activated");
+    } else if (event.isInstanceOf[DisconnectedEvent]) {
+      log.info("Disconnected from redis");
+    } else if (event.isInstanceOf[ConnectionDeactivatedEvent]) {
+      log.info("Connection to redis deactivated");
+    }
+  }
+}
diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala
index 60bfcbeec9..47c5af6f10 100755
--- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala
+++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala
@@ -1,8 +1,14 @@
 package org.bigbluebutton.common2.redis
 
 import akka.actor.ActorSystem
+import akka.event.Logging
+
+class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) with RedisConnectionHandler {
+
+  val log = Logging(system, getClass)
+
+  subscribeToEventBus(redis, log)
 
-class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) {
   val connection = redis.connectPubSub()
 
   redis.connect()
diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala
index 90febddff1..e091e9ad25 100644
--- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala
+++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala
@@ -18,7 +18,10 @@ import java.io.PrintWriter
 abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
                                        channels: Seq[String], patterns: Seq[String],
                                        jsonMsgBus: IncomingJsonMessageBus)
-  extends RedisClientProvider(system, clientName) with Actor with ActorLogging {
+  extends RedisClientProvider(system, clientName) with RedisConnectionHandler with Actor with ActorLogging {
+
+  subscribeToEventBus(redis, log)
+
   var connection = redis.connectPubSub()
 
   def addListener(appChannel: String) {
@@ -31,9 +34,9 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
       }
       def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
       def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
-      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) }
+      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
       def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
-      def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) }
+      def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
     })
   }
 
@@ -52,10 +55,6 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
       Resume
     }
   }
-  
-  def publishEvent() {
-    
-  }
 
   def receive = {
     case _ => // do nothing
diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala
index f6158998de..bcb2e4b3d2 100755
--- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala
+++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala
@@ -42,9 +42,9 @@ class WebRedisSubscriberActor(
         oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage))
       }
       def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
-      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) }
+      def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
       def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
-      def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) }
+      def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
     })
   }
 
diff --git a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala
index dbe83dc52d..2ed8ee87d1 100755
--- a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala
+++ b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala
@@ -1,6 +1,5 @@
 package org.bigbluebutton.app.screenshare.server.util
 
-import org.slf4j.Logger
 import org.red5.logging.Red5LoggerFactory
 
 /**
-- 
GitLab