Skip to content
Snippets Groups Projects
Commit 114cba67 authored by Ghazi Triki's avatar Ghazi Triki
Browse files

Improved redis connection logging.

parent c9791b32
No related branches found
No related tags found
No related merge requests found
Showing
with 83 additions and 17 deletions
...@@ -39,9 +39,9 @@ class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromA ...@@ -39,9 +39,9 @@ class Red5AppsRedisSubscriberActor(system: ActorSystem, jsonMsgBus: JsonMsgFromA
} }
def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
}) })
} }
......
...@@ -20,13 +20,25 @@ ...@@ -20,13 +20,25 @@
package org.bigbluebutton.common2.redis; package org.bigbluebutton.common2.redis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import io.lettuce.core.RedisClient; import io.lettuce.core.RedisClient;
import io.lettuce.core.event.Event;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ConnectedEvent;
import io.lettuce.core.event.connection.ConnectionActivatedEvent;
import io.lettuce.core.event.connection.ConnectionDeactivatedEvent;
import io.lettuce.core.event.connection.DisconnectedEvent;
import reactor.core.Disposable;
public abstract class RedisAwareCommunicator { public abstract class RedisAwareCommunicator {
protected RedisClient redisClient; protected RedisClient redisClient;
protected Disposable eventBusSubscription;
protected EventBus eventBus;
protected String host; protected String host;
protected String password; protected String password;
protected int port; protected int port;
...@@ -41,6 +53,18 @@ public abstract class RedisAwareCommunicator { ...@@ -41,6 +53,18 @@ public abstract class RedisAwareCommunicator {
this.password = password; this.password = password;
} }
protected void connectionStatusHandler(Event event, Logger log) {
if (event instanceof ConnectedEvent) {
log.info("Connected to redis");
} else if (event instanceof ConnectionActivatedEvent) {
log.info("Connected to redis activated");
} else if (event instanceof DisconnectedEvent) {
log.info("Disconnected from redis");
} else if (event instanceof ConnectionDeactivatedEvent) {
log.info("Connected to redis deactivated");
}
}
public void setClientName(String clientName) { public void setClientName(String clientName) {
this.clientName = clientName; this.clientName = clientName;
} }
......
...@@ -43,11 +43,14 @@ public class RedisStorageService extends RedisAwareCommunicator { ...@@ -43,11 +43,14 @@ public class RedisStorageService extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri); redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connection = redisClient.connect(); connection = redisClient.connect();
} }
public void stop() { public void stop() {
eventBusSubscription.dispose();
connection.close(); connection.close();
redisClient.shutdown(); redisClient.shutdown();
log.info("RedisStorageService Stopped"); log.info("RedisStorageService Stopped");
...@@ -93,15 +96,17 @@ public class RedisStorageService extends RedisAwareCommunicator { ...@@ -93,15 +96,17 @@ public class RedisStorageService extends RedisAwareCommunicator {
RedisCommands<String, String> commands = connection.sync(); RedisCommands<String, String> commands = connection.sync();
Long msgid = commands.incr("global:nextRecordedMsgId"); Long msgid = commands.incr("global:nextRecordedMsgId");
commands.hmset("recording:" + meetingId + ":" + msgid, event); String key = "recording:" + meetingId + ":" + msgid;
commands.hmset(key, event);
/** /**
* We set the key to expire after 14 days as we are still recording the * We set the key to expire after 14 days as we are still recording the
* event into redis even if the meeting is not recorded. (ralam sept 23, * event into redis even if the meeting is not recorded. (ralam sept 23,
* 2015) * 2015)
*/ */
commands.expire("meeting:" + meetingId + ":recordings", expireKey); commands.expire(key, expireKey);
commands.rpush("meeting:" + meetingId + ":recordings", Long.toString(msgid)); key = "meeting:" + meetingId + ":recordings";
commands.expire("meeting:" + meetingId + ":recordings", expireKey); commands.rpush(key, Long.toString(msgid));
commands.expire(key, expireKey);
connection.close(); connection.close();
} }
......
...@@ -41,6 +41,8 @@ public class MessageReceiver extends RedisAwareCommunicator { ...@@ -41,6 +41,8 @@ public class MessageReceiver extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri); redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(), connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig()); createPoolingConfig());
......
...@@ -43,6 +43,8 @@ public class MessageSender extends RedisAwareCommunicator { ...@@ -43,6 +43,8 @@ public class MessageSender extends RedisAwareCommunicator {
redisClient = RedisClient.create(redisUri); redisClient = RedisClient.create(redisUri);
redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build()); redisClient.setOptions(ClientOptions.builder().autoReconnect(true).build());
eventBus = redisClient.getResources().eventBus();
eventBusSubscription = eventBus.get().subscribe(e -> connectionStatusHandler(e, log));
connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(), connectionPool = ConnectionPoolSupport.createGenericObjectPool(() -> redisClient.connectPubSub(),
createPoolingConfig()); createPoolingConfig());
......
package org.bigbluebutton.common2.redis
import io.lettuce.core.RedisClient
import io.lettuce.core.event.Event
import io.lettuce.core.event.EventBus
import io.lettuce.core.event.connection.{ ConnectionDeactivatedEvent, ConnectionActivatedEvent, ConnectedEvent, DisconnectedEvent }
import reactor.core.Disposable
import akka.event.LoggingAdapter
trait RedisConnectionHandler {
def subscribeToEventBus(redis: RedisClient, log: LoggingAdapter) {
val eventBus: EventBus = redis.getResources().eventBus();
// @todo : unsubscribe when connection is closed
val eventBusSubscription: Disposable = eventBus.get().subscribe(e => connectionStatusHandler(e, log))
}
def connectionStatusHandler(event: Event, log: LoggingAdapter) {
if (event.isInstanceOf[ConnectedEvent]) {
log.info("Connected to redis");
} else if (event.isInstanceOf[ConnectionActivatedEvent]) {
log.info("Connection to redis activated");
} else if (event.isInstanceOf[DisconnectedEvent]) {
log.info("Disconnected from redis");
} else if (event.isInstanceOf[ConnectionDeactivatedEvent]) {
log.info("Connection to redis deactivated");
}
}
}
package org.bigbluebutton.common2.redis package org.bigbluebutton.common2.redis
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.event.Logging
class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) with RedisConnectionHandler {
val log = Logging(system, getClass)
subscribeToEventBus(redis, log)
class RedisPublisher(system: ActorSystem, clientName: String) extends RedisClientProvider(system, clientName) {
val connection = redis.connectPubSub() val connection = redis.connectPubSub()
redis.connect() redis.connect()
......
...@@ -18,7 +18,10 @@ import java.io.PrintWriter ...@@ -18,7 +18,10 @@ import java.io.PrintWriter
abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
channels: Seq[String], patterns: Seq[String], channels: Seq[String], patterns: Seq[String],
jsonMsgBus: IncomingJsonMessageBus) jsonMsgBus: IncomingJsonMessageBus)
extends RedisClientProvider(system, clientName) with Actor with ActorLogging { extends RedisClientProvider(system, clientName) with RedisConnectionHandler with Actor with ActorLogging {
subscribeToEventBus(redis, log)
var connection = redis.connectPubSub() var connection = redis.connectPubSub()
def addListener(appChannel: String) { def addListener(appChannel: String) {
...@@ -31,9 +34,9 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, ...@@ -31,9 +34,9 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
} }
def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) } def message(pattern: String, channel: String, message: String): Unit = { log.info("Subscribed to channel {} with pattern {}", channel, pattern) }
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
}) })
} }
...@@ -52,10 +55,6 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String, ...@@ -52,10 +55,6 @@ abstract class RedisSubscriberProvider(system: ActorSystem, clientName: String,
Resume Resume
} }
} }
def publishEvent() {
}
def receive = { def receive = {
case _ => // do nothing case _ => // do nothing
......
...@@ -42,9 +42,9 @@ class WebRedisSubscriberActor( ...@@ -42,9 +42,9 @@ class WebRedisSubscriberActor(
oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage)) oldMessageEventBus.publish(OldIncomingJsonMessage(fromAkkaAppsOldJsonChannel, receivedJsonMessage))
} }
def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) } def psubscribed(pattern: String, count: Long): Unit = { log.info("Subscribed to pattern {}", pattern) }
def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed to pattern {}", pattern) } def punsubscribed(pattern: String, count: Long): Unit = { log.info("Unsubscribed from pattern {}", pattern) }
def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) } def subscribed(channel: String, count: Long): Unit = { log.info("Subscribed to pattern {}", channel) }
def unsubscribed(channel: String, count: Long): Unit = { log.info("Subscribed to channel {}", channel) } def unsubscribed(channel: String, count: Long): Unit = { log.info("Unsubscribed from channel {}", channel) }
}) })
} }
......
package org.bigbluebutton.app.screenshare.server.util package org.bigbluebutton.app.screenshare.server.util
import org.slf4j.Logger
import org.red5.logging.Red5LoggerFactory import org.red5.logging.Red5LoggerFactory
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment