diff --git a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/ReceivedMessage.java b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/ReceivedMessage.java new file mode 100755 index 0000000000000000000000000000000000000000..6c3307b38d3e566cb2e8fe9d360df612fe9455ab --- /dev/null +++ b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/ReceivedMessage.java @@ -0,0 +1,28 @@ +package org.bigbluebutton.core.pubsub.receivers; + +public class ReceivedMessage { + + private final String pattern; + private final String channel; + private final String message; + + public ReceivedMessage(String pattern, String channel, String message) { + this.pattern = pattern; + this.channel = channel; + this.message = message; + } + + public String getPattern() { + return pattern; + } + + public String getChannel() { + return channel; + } + + public String getMessage() { + return message; + } + + +} diff --git a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/RedisMessageReceiver.java b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/RedisMessageReceiver.java index 3c932c0e4ddf059aef74dbb79e4c8a4224dbd4d8..d503d2f62c961dbf91d3c19e55f3ddcfe4c4001c 100755 --- a/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/RedisMessageReceiver.java +++ b/akka-bbb-apps/src/main/java/org/bigbluebutton/core/pubsub/receivers/RedisMessageReceiver.java @@ -2,10 +2,24 @@ package org.bigbluebutton.core.pubsub.receivers; import java.util.ArrayList; import java.util.List; - +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import org.bigbluebutton.core.api.IBigBlueButtonInGW; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RedisMessageReceiver { + private static final Logger log = LoggerFactory.getLogger(RedisMessageReceiver.class); + + private BlockingQueue<ReceivedMessage> receivedMessages = new LinkedBlockingQueue<ReceivedMessage>(); + + private volatile boolean processMessage = false; + + private final Executor msgProcessorExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); + private List<MessageHandler> receivers; private IBigBlueButtonInGW bbbGW; @@ -13,6 +27,7 @@ public class RedisMessageReceiver { this.bbbGW = bbbGW; receivers = new ArrayList<MessageHandler>(); setupReceivers(); + start(); } private void setupReceivers() { @@ -39,8 +54,44 @@ public class RedisMessageReceiver { } public void handleMessage(String pattern, String channel, String message) { - for (MessageHandler l : receivers) { - l.handleMessage(pattern, channel, message); - } + ReceivedMessage rm = new ReceivedMessage(pattern, channel, message); + receivedMessages.add(rm); } + + public void stop() { + processMessage = false; + } + + public void start() { + try { + processMessage = true; + + Runnable messageProcessor = new Runnable() { + public void run() { + while (processMessage) { + try { + ReceivedMessage msg = receivedMessages.take(); + processMessage(msg); + } catch (InterruptedException e) { + log.warn("Error while taking received message from queue."); + } + } + } + }; + msgProcessorExec.execute(messageProcessor); + } catch (Exception e) { + log.error("Error subscribing to channels: " + e.getMessage()); + } + } + + private void processMessage(final ReceivedMessage msg) { + Runnable task = new Runnable() { + public void run() { + for (MessageHandler l : receivers) { + l.handleMessage(msg.getPattern(), msg.getChannel(), msg.getMessage()); + } + } + }; + runExec.execute(task); + } } diff --git a/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/service/ParticipantsService.java b/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/service/ParticipantsService.java index 2fec3abf851367c438f48279937bd0a3d8698cf1..39b51754601e71f9bcb6ae964b658b67f76b9edb 100755 --- a/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/service/ParticipantsService.java +++ b/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/service/ParticipantsService.java @@ -62,6 +62,7 @@ public class ParticipantsService { String emojiStatus = (String) msg.get("emojiStatus"); if (StringUtils.isEmpty(emojiStatus)) { + log.warn("Invalid EmojiStatus from client: meetingId=" + meetingId + ", userId=" + userId + ",emoji=" + emojiStatus); // Set emojiStatus=none if passed is null. emojiStatus = "none"; }