diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala index e23985f145a1ab6eae6b7cd3c664d440083268d1..0281a091f34ae86e23e4494eef8682934775e904 100644 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/endpoint/redis/Red5AppsRedisSubscriberActor.scala @@ -39,9 +39,9 @@ class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromA } def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } - def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } - def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) } }) } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java index 1a8d5f0108a8931605aaf441e9bc21a6259e0835..23dc26d26f04721273a3a8f552001d87dfd0ed8a 100644 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisAwareCommunicator.java @@ -20,13 +20,25 @@ package org.bigbluebutton.common2.redis; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; import io.lettuce.core.RedisClient; +import io.lettuce.core.event.Event; +import io.lettuce.core.event.EventBus; +import io.lettuce.core.event.connection.ConnectedEvent; +import io.lettuce.core.event.connection.ConnectionActivatedEvent; +import io.lettuce.core.event.connection.ConnectionDeactivatedEvent; +import io.lettuce.core.event.connection.DisconnectedEvent; +import reactor.core.Disposable; public abstract class RedisAwareCommunicator { protected RedisClient redisClient; + protected Disposable eventBusSubscription; + + protected EventBus eventBus; + protected String host; protected String password; protected int port; @@ -41,6 +53,18 @@ public abstract class RedisAwareCommunicator { this.password = password; } + protected void connectionStatusHandler(Event event, Logger log) { + if (event instanceof ConnectedEvent) { + log.info("Connected to redis"); + } else if (event instanceof ConnectionActivatedEvent) { + log.info("Connected to redis activated"); + } else if (event instanceof DisconnectedEvent) { + log.info("Disconnected from redis"); + } else if (event instanceof ConnectionDeactivatedEvent) { + log.info("Connected to redis deactivated"); + } + } + public void setClientName(String clientName) { this.clientName = clientName; } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java index a419db0a5a14869fdee5d13503c65ba8adc69c7c..31dae8339600613607f13d30672f24b52586b5b8 100644 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java @@ -43,11 +43,14 @@ public class RedisStorageService extends RedisAwareCommunicator { redisClient = RedisClient.create(redisUri); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); + eventBus = redisClient.getResources().eventBus(); + eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log)); connection = redisClient.connect(); } public void stop() { + eventBusSubscription.dispose(); connection.close(); redisClient.shutdown(); log.info("RedisStorageService Stopped"); @@ -93,15 +96,17 @@ public class RedisStorageService extends RedisAwareCommunicator { RedisCommands<String, String> commands = connection.sync(); Long msgid = commands.incr("global:nextRecordedMsgId"); - commands.hmset("recording:" + meetingId + ":" + msgid, event); + String key = "recording:" + meetingId + ":" + msgid; + commands.hmset(key, event); /** * We set the key to expire after 14 days as we are still recording the * event into redis even if the meeting is not recorded. (ralam sept 23, * 2015) */ - commands.expire("meeting:" + meetingId + ":recordings", expireKey); - commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid)); - commands.expire("meeting:" + meetingId + ":recordings", expireKey); + commands.expire(key, expireKey); + key = "meeting:" + meetingId + ":recordings"; + commands.rpush(key, Long.toString(msgid)); + commands.expire(key, expireKey); connection.close(); } diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java index 6fd04a5fdc4cf513e27bddcf812f64b64ccc88b6..ade4b5d648f11a4da9ba29adbb72de43d921edce 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageReceiver.java @@ -41,6 +41,8 @@ public class MessageReceiver extends RedisAwareCommunicator { redisClient = RedisClient.create(redisUri); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); + eventBus = redisClient.getResources().eventBus(); + eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log)); connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(), createPoolingConfig()); diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java index b35b54c335b810fcd32965d231e0370d070ae740..8209f5bf76934d9959a385861ef773f184845bd4 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/pubsub/MessageSender.java @@ -43,6 +43,8 @@ public class MessageSender extends RedisAwareCommunicator { redisClient = RedisClient.create(redisUri); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); + eventBus = redisClient.getResources().eventBus(); + eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log)); connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(), createPoolingConfig()); diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala new file mode 100644 index 0000000000000000000000000000000000000000..c30003c95da1becaaff67d45c705d67bcf09a650 --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisConnectionHandler.scala @@ -0,0 +1,29 @@ +package org.bigbluebutton.common2.redis + +import io.lettuce.core.RedisClient +import io.lettuce.core.event.Event +import io.lettuce.core.event.EventBus +import io.lettuce.core.event.connection.{ ConnectionDeactivatedEvent, ConnectionActivatedEvent, ConnectedEvent, DisconnectedEvent } +import reactor.core.Disposable +import akka.event.LoggingAdapter + +trait RedisConnectionHandler { + + def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) { + val eventBus: EventBus = redis.getResources().eventBus(); + // @todo : unsubscribe when connection is closed + val eventBusSubscription: Disposable = eventBus.get().subscribe(e => connectionStatusHandler(e, log)) + } + + def connectionStatusHandler(event: Event, log: LoggingAdapter) { + if (event.isInstanceOf[ConnectedEvent]) { + log.info("Connected to redis"); + } else if (event.isInstanceOf[ConnectionActivatedEvent]) { + log.info("Connection to redis activated"); + } else if (event.isInstanceOf[DisconnectedEvent]) { + log.info("Disconnected from redis"); + } else if (event.isInstanceOf[ConnectionDeactivatedEvent]) { + log.info("Connection to redis deactivated"); + } + } +} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala index 60bfcbeec9e1195c83e23b6254d0b2b79527b7f0..47c5af6f10dcba75a018bcc1658bfebd12219fb2 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisPublisher.scala @@ -1,8 +1,14 @@ package org.bigbluebutton.common2.redis import akka.actor.ActorSystem +import akka.event.Logging + +class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) with RedisConnectionHandler { + + val log = Logging(system, getClass) + + subscribeToEventBus(redis, log) -class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) { val connection = redis.connectPubSub() redis.connect() diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala index 90febddff18206261d99ab6e3ee7893de3a27f30..e091e9ad25919454973ae8f4bf450c712d89eccc 100644 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/redis/RedisSubscriberProvider.scala @@ -18,7 +18,10 @@ import java.io.PrintWriter abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, channels: Seq[String], patterns: Seq[String], jsonMsgBus: IncomingJsonMessageBus) - extends RedisClientProvider(system, clientName) with Actor with ActorLogging { + extends RedisClientProvider(system, clientName) with RedisConnectionHandler with Actor with ActorLogging { + + subscribeToEventBus(redis, log) + var connection = redis.connectPubSub() def addListener(appChannel: String) { @@ -31,9 +34,9 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, } def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } - def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } - def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) } }) } @@ -52,10 +55,6 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, Resume } } - - def publishEvent() { - - } def receive = { case _ => // do nothing diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala index f6158998de6792db597d139f4ff5297188a73d1a..bcb2e4b3d26ebbe6d7d9b7573849162364e3f35b 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/endpoint/redis/WebRedisSubscriberActor.scala @@ -42,9 +42,9 @@ class WebRedisSubscriberActor( oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage)) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } - def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } + def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } - def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } + def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) } }) } diff --git a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala index dbe83dc52d9a768277f146622f565f816d6b2033..2ed8ee87d1a79cd87144b8b0c09b2e4bc9d4c960 100755 --- a/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala +++ b/bbb-screenshare/app/src/main/scala/org/bigbluebutton/app/screenshare/server/util/LogHelper.scala @@ -1,6 +1,5 @@ package org.bigbluebutton.app.screenshare.server.util -import org.slf4j.Logger import org.red5.logging.Red5LoggerFactory /**