diff --git a/bbb-video/build.gradle b/bbb-video/build.gradle index b55635cbe28776a4fed15097a27f07e83d19f3be..4b258fa5589ec92e8b35f126f19f78b3358ac983 100755 --- a/bbb-video/build.gradle +++ b/bbb-video/build.gradle @@ -58,7 +58,7 @@ dependencies { compile 'org.easymock:easymock:2.4@jar' //redis - compile 'redis.clients:jedis:2.0.0' + compile 'redis.clients:jedis:2.9.0' compile 'commons-pool:commons-pool:1.5.6' compile 'com.google.code.gson:gson:2.5' diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageDistributor.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageDistributor.java new file mode 100755 index 0000000000000000000000000000000000000000..b51ed206c8f6ea6b5da5e3681709dbfe103f6f3b --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageDistributor.java @@ -0,0 +1,25 @@ +package org.bigbluebutton.red5.pubsub; + +import java.util.Set; + +public class MessageDistributor { + private ReceivedMessageHandler handler; + private Set<MessageHandler> listeners; + + public void setMessageListeners(Set<MessageHandler> listeners) { + this.listeners = listeners; + } + + public void setMessageHandler(ReceivedMessageHandler handler) { + this.handler = handler; + if (handler != null) { + handler.setMessageDistributor(this); + } + } + + public void notifyListeners(String pattern, String channel, String message) { + for (MessageHandler listener : listeners) { + listener.handleMessage(pattern, channel, message); + } + } +} \ No newline at end of file diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageHandler.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageHandler.java new file mode 100755 index 0000000000000000000000000000000000000000..8f197312c290d0ef21e0baed37478d9d7bfb94e3 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageHandler.java @@ -0,0 +1,5 @@ +package org.bigbluebutton.red5.pubsub; + +public interface MessageHandler { + void handleMessage(String pattern, String channel, String message); +} diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageReceiver.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageReceiver.java new file mode 100755 index 0000000000000000000000000000000000000000..f610e47f09e56bc27e5926ef0cdd94d72016b80e --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageReceiver.java @@ -0,0 +1,115 @@ +package org.bigbluebutton.red5.pubsub; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.exceptions.JedisConnectionException; + +public class MessageReceiver { + private static Logger log = LoggerFactory.getLogger(MessageReceiver.class); + + private ReceivedMessageHandler handler; + + private Jedis jedis; + private volatile boolean receiveMessage = false; + + private final Executor msgReceiverExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); + + private final String FROM_BBB_APPS_PATTERN = "from-akka-apps-redis-channel"; + + private String host; + private int port; + + public void stop() { + receiveMessage = false; + } + + public void start() { + log.info("Ready to receive messages from Redis pubsub."); + try { + receiveMessage = true; + jedis = new Jedis(host, port); + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + jedis.clientSetname("BbbWebSub"); + + Runnable messageReceiver = new Runnable() { + public void run() { + if (receiveMessage) { + try { + jedis.subscribe(new PubSubListener(), FROM_BBB_APPS_PATTERN); + } catch(JedisConnectionException ex) { + log.warn("Exception on Jedis connection. Resubscribing to pubsub."); + start(); + } catch (Exception e) { + log.error("Error resubscribing to channels: " + e.getMessage()); + } + } + } + }; + msgReceiverExec.execute(messageReceiver); + } catch (Exception e) { + log.error("Error subscribing to channels: " + e.getMessage()); + } + } + + public void setHost(String host){ + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + + public void setMessageHandler(ReceivedMessageHandler handler) { + this.handler = handler; + } + + private class PubSubListener extends JedisPubSub { + + public PubSubListener() { + super(); + } + + @Override + public void onMessage(String channel, String message) { + // Not used. + } + + @Override + public void onPMessage(final String pattern, final String channel, final String message) { + Runnable task = new Runnable() { + public void run() { + handler.handleMessage(pattern, channel, message); + } + }; + + runExec.execute(task); + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) { + log.debug("Subscribed to the pattern: " + pattern); + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) { + // Not used. + } + + @Override + public void onSubscribe(String channel, int subscribedChannels) { + // Not used. + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) { + // Not used. + } + } +} + diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessage.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessage.java new file mode 100755 index 0000000000000000000000000000000000000000..41c775f739fc95f8e0c827ff1aa4a2adf6976ee5 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessage.java @@ -0,0 +1,25 @@ +package org.bigbluebutton.red5.pubsub; + +public class ReceivedMessage { + private final String pattern; + private final String channel; + private final String message; + + public ReceivedMessage(String pattern, String channel, String message) { + this.pattern = pattern; + this.channel = channel; + this.message = message; + } + + public String getPattern() { + return pattern; + } + + public String getChannel() { + return channel; + } + + public String getMessage() { + return message; + } +} diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessageHandler.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessageHandler.java new file mode 100755 index 0000000000000000000000000000000000000000..da561c50e862bd4f66a459e48cf60642dd9bea22 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/ReceivedMessageHandler.java @@ -0,0 +1,75 @@ +package org.bigbluebutton.red5.pubsub; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReceivedMessageHandler { + private static Logger log = LoggerFactory.getLogger(ReceivedMessageHandler.class); + + private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>(); + + private volatile boolean processMessage = false; + + private final Executor msgProcessorExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); + + private MessageDistributor handler; + + public void stop() { + processMessage = false; + } + + public void start() { + log.info("Ready to handle messages from Redis pubsub!"); + + try { + processMessage = true; + + Runnable messageProcessor = new Runnable() { + public void run() { + while (processMessage) { + try { + ReceivedMessage msg = receivedMessages.take(); + processMessage(msg); + } catch (InterruptedException e) { + log.warn("Error while taking received message from queue."); + } + } + } + }; + msgProcessorExec.execute(messageProcessor); + } catch (Exception e) { + log.error("Error subscribing to channels: " + e.getMessage()); + } + } + + private void processMessage(final ReceivedMessage msg) { + Runnable task = new Runnable() { + public void run() { + if (handler != null) { +// log.debug("Let's process this message: " + msg.getMessage()); + + handler.notifyListeners(msg.getPattern(), msg.getChannel(), msg.getMessage()); + } else { + log.warn("No listeners interested in messages from Redis!"); + } + } + }; + + runExec.execute(task); + } + + public void handleMessage(String pattern, String channel, String message) { + ReceivedMessage rm = new ReceivedMessage(pattern, channel, message); + receivedMessages.add(rm); + } + + public void setMessageDistributor(MessageDistributor h) { + this.handler = h; + } +}