diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java index 6ccc4d211445d837b924ab1c400d693c4f33409a..e5dbe23501afd048e777a5d53c0b236f5f74f0eb 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java @@ -53,16 +53,16 @@ public class MessageSender { sendMessage = true; Runnable messageSender = new Runnable() { - public void run() { - while (sendMessage) { - try { + public void run() { + while (sendMessage) { + try { MessageToSend msg = messages.take(); publish(msg.getChannel(), msg.getMessage()); } catch (InterruptedException e) { log.warn("Failed to get org.bigbluebutton.red5.pubsub.message from queue."); - } - } - } + } + } + } }; msgSenderExec.execute(messageSender); } catch (Exception e) { @@ -77,16 +77,16 @@ public class MessageSender { private void publish(final String channel, final String message) { Runnable task = new Runnable() { - public void run() { - Jedis jedis = redisPool.getResource(); - try { - jedis.publish(channel, message); - } catch(Exception e){ - log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e); - } finally { - redisPool.returnResource(jedis); - } - } + public void run() { + Jedis jedis = redisPool.getResource(); + try { + jedis.publish(channel, message); + } catch(Exception e){ + log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e); + } finally { + redisPool.returnResource(jedis); + } + } }; runExec.execute(task); diff --git a/bbb-voice/build.gradle b/bbb-voice/build.gradle index c7e57db98cd6003ecdc06e74cc12b8021dd92fe5..f58d7d1375cb526ae99116a2050f7e13c92a3bb5 100755 --- a/bbb-voice/build.gradle +++ b/bbb-voice/build.gradle @@ -55,11 +55,14 @@ dependencies { // Testing testRuntime 'org.easymock:easymock:2.4@jar' - - // Redis pubsub - compile "redis.clients:jedis:2.1.0" - compile 'commons-pool:commons-pool:1.5.6' + + //redis + compile 'redis.clients:jedis:2.9.0' + compile 'org.apache.commons:commons-pool2:2.3' compile 'com.google.code.gson:gson:2.5' + + compile 'org.apache.commons:commons-lang3:3.5' + compile 'org.bigbluebutton:bbb-common-message_2.12:0.0.19-SNAPSHOT' } diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/GenericObjectPoolConfigWrapper.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/GenericObjectPoolConfigWrapper.java deleted file mode 100755 index 47f3380f18929630792389aba30428bad3280bdc..0000000000000000000000000000000000000000 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/GenericObjectPoolConfigWrapper.java +++ /dev/null @@ -1,142 +0,0 @@ -/** -* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ -* -* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). -* -* This program is free software; you can redistribute it and/or modify it under the -* terms of the GNU Lesser General Public License as published by the Free Software -* Foundation; either version 3.0 of the License, or (at your option) any later -* version. -* -* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY -* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public License along -* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. -* -*/ -package org.bigbluebutton.voiceconf.messaging; - -import org.apache.commons.pool.impl.GenericObjectPool; - -public class GenericObjectPoolConfigWrapper { - - private final GenericObjectPool.Config config; - - public GenericObjectPoolConfigWrapper() { - this.config = new GenericObjectPool.Config(); - } - - public GenericObjectPool.Config getConfig() { - return config; - } - - public int getMaxIdle() { - return this.config.maxIdle; - } - - public void setMaxIdle(int maxIdle) { - this.config.maxIdle = maxIdle; - } - - public int getMinIdle() { - return this.config.minIdle; - } - - public void setMinIdle(int minIdle) { - this.config.minIdle = minIdle; - } - - public int getMaxActive() { - return this.config.maxActive; - } - - public void setMaxActive(int maxActive) { - this.config.maxActive = maxActive; - } - - public long getMaxWait() { - return this.config.maxWait; - } - - public void setMaxWait(long maxWait) { - this.config.maxWait = maxWait; - } - - public byte getWhenExhaustedAction() { - return this.config.whenExhaustedAction; - } - - public void setWhenExhaustedAction(byte whenExhaustedAction) { - this.config.whenExhaustedAction = whenExhaustedAction; - } - - public boolean isTestOnBorrow() { - return this.config.testOnBorrow; - } - - public void setTestOnBorrow(boolean testOnBorrow) { - this.config.testOnBorrow = testOnBorrow; - } - - public boolean isTestOnReturn() { - return this.config.testOnReturn; - } - - public void setTestOnReturn(boolean testOnReturn) { - this.config.testOnReturn = testOnReturn; - } - - public boolean isTestWhileIdle() { - return this.config.testWhileIdle; - } - - public void setTestWhileIdle(boolean testWhileIdle) { - this.config.testWhileIdle = testWhileIdle; - } - - public long getTimeBetweenEvictionRunsMillis() { - return this.config.timeBetweenEvictionRunsMillis; - } - - public void setTimeBetweenEvictionRunsMillis( - long timeBetweenEvictionRunsMillis) { - this.config.timeBetweenEvictionRunsMillis = - timeBetweenEvictionRunsMillis; - } - - public int getNumTestsPerEvictionRun() { - return this.config.numTestsPerEvictionRun; - } - - public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { - this.config.numTestsPerEvictionRun = numTestsPerEvictionRun; - } - - public long getMinEvictableIdleTimeMillis() { - return this.config.minEvictableIdleTimeMillis; - } - - public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { - this.config.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; - } - - public long getSoftMinEvictableIdleTimeMillis() { - return this.config.softMinEvictableIdleTimeMillis; - } - - public void setSoftMinEvictableIdleTimeMillis( - long softMinEvictableIdleTimeMillis) { - this.config.softMinEvictableIdleTimeMillis = - softMinEvictableIdleTimeMillis; - } - - public boolean isLifo() { - return this.config.lifo; - } - - public void setLifo(boolean lifo) { - this.config.lifo = lifo; - } -} diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/IMessagingService.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/IMessagingService.java index 72e5905e545218c5fa0673307540aaa56c782790..f3498c6601e75236e53973a94efe8f7789d335b3 100755 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/IMessagingService.java +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/IMessagingService.java @@ -1,6 +1,7 @@ package org.bigbluebutton.voiceconf.messaging; public interface IMessagingService { + void validateConnAuthToken(String meetingId, String userId, String authToken, String connId); void userConnectedToGlobalAudio(String voiceConf, String callerIdName); void userDisconnectedFromGlobalAudio(String voiceConf, String callerIdName); } diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MeetingMessageHandler.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MeetingMessageHandler.java new file mode 100755 index 0000000000000000000000000000000000000000..04623357859b93ae1dcb41d49aff0bc66994cbe4 --- /dev/null +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MeetingMessageHandler.java @@ -0,0 +1,74 @@ +package org.bigbluebutton.voiceconf.messaging; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.bigbluebutton.voiceconf.messaging.messages.ValidateConnTokenRespMsg; +import org.bigbluebutton.voiceconf.red5.ConnectionInvokerService; +import org.red5.logging.Red5LoggerFactory; +import org.slf4j.Logger; + +public class MeetingMessageHandler implements MessageHandler { + private static Logger log = Red5LoggerFactory.getLogger(MeetingMessageHandler.class, "video"); + + private final String HEADER = "header"; + private final String NAME = "name"; + private final String BODY = "body"; + private final String MEETING_ID = "meetingId"; + private final String TIMESTAMP = "timestamp"; + private final String ENVELOPE = "envelope"; + private final String CORE = "core"; + + private final String USERID = "userId"; + private final String AUTHZED = "authzed"; + private final String CONN = "connId"; + private final String APP = "app"; + private final String VOICE_APP = "VOICE"; + + private final String ValidateConnAuthTokenSysRespMsg = "ValidateConnAuthTokenSysRespMsg"; + + private ConnectionInvokerService connInvokerService; + + public void handleMessage(String pattern, String channel, String message) { + JsonParser parser = new JsonParser(); + JsonObject obj = (JsonObject) parser.parse(message); + + if (obj.has(ENVELOPE) && obj.has(CORE)) { + JsonObject core = obj.getAsJsonObject(CORE); + JsonObject header = core.getAsJsonObject(HEADER); + if (header.has(NAME)) { + String name = header.get(NAME).getAsString(); + handle(name, core.getAsJsonObject(BODY)); + } + } + } + + private void handle(String name, JsonObject body) { + if (ValidateConnAuthTokenSysRespMsg.equals(name)) { + Gson gson = new Gson(); + String logStr = gson.toJson(body); + + log.debug("HANDLE: {}", logStr); + if (body.has(MEETING_ID) && body.has(USERID) + && body.has(AUTHZED) && body.has(CONN) && body.has(APP)) { + String meetingId = body.get(MEETING_ID).getAsString(); + String userId = body.get(USERID).getAsString(); + Boolean authzed = body.get(AUTHZED).getAsBoolean(); + String conn = body.get(CONN).getAsString(); + String app = body.get(APP).getAsString(); + + log.debug("PROCESS: {}", name); + if (VOICE_APP.equals(app)) { + ValidateConnTokenRespMsg vctrm = new ValidateConnTokenRespMsg(meetingId, userId, authzed, conn); + connInvokerService.sendMessage(vctrm); + } + } else { + log.debug("INVALID MSG FORMAT: {}", logStr); + } + } + } + + public void setConnInvokerService(ConnectionInvokerService connInvokerService) { + this.connInvokerService = connInvokerService; + } +} diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageReceiver.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageReceiver.java index 5c170fc7a78c79b1f12eadb3da20bec860db64b2..9033abe06a9a511d90f42520cf8c5daae97b5df6 100755 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageReceiver.java +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageReceiver.java @@ -7,62 +7,97 @@ import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.exceptions.JedisConnectionException; public class MessageReceiver { private static Logger log = Red5LoggerFactory.getLogger(MessageReceiver.class, "bigbluebutton"); - + private ReceivedMessageHandler handler; - - private JedisPool redisPool; + + private Jedis jedis; private volatile boolean receiveMessage = false; - + private final Executor msgReceiverExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); + + private final String FROM_BBB_APPS_PATTERN = "from-akka-apps-redis-channel"; + + private String host; + private int port; public void stop() { receiveMessage = false; } - + public void start() { log.info("Ready to receive messages from Redis pubsub."); try { receiveMessage = true; - final Jedis jedis = redisPool.getResource(); - + jedis = new Jedis(host, port); + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + jedis.clientSetname("BbbRed5VoiceSub"); + Runnable messageReceiver = new Runnable() { - public void run() { - if (receiveMessage) { - jedis.psubscribe(new PubSubListener(), MessagingConstants.TO_AKKA_APPS_CHANNEL); - } - } + public void run() { + if (receiveMessage) { + try { + jedis.subscribe(new PubSubListener(), FROM_BBB_APPS_PATTERN); + } catch(JedisConnectionException ex) { + log.warn("Exception on Jedis connection. Resubscribing to pubsub."); + start(); + } catch (Exception e) { + log.error("Error resubscribing to channels: " + e.getMessage()); + } + } + } }; msgReceiverExec.execute(messageReceiver); } catch (Exception e) { log.error("Error subscribing to channels: " + e.getMessage()); - } + } } - - public void setRedisPool(JedisPool redisPool){ - this.redisPool = redisPool; + + public void setHost(String host){ + this.host = host; } - + + public void setPort(int port) { + this.port = port; + } + public void setMessageHandler(ReceivedMessageHandler handler) { this.handler = handler; } - + private class PubSubListener extends JedisPubSub { - + public PubSubListener() { - super(); + super(); } @Override public void onMessage(String channel, String message) { // Not used. + Runnable task = new Runnable() { + public void run() { + handler.handleMessage("", channel, message); + } + }; + + runExec.execute(task); } @Override - public void onPMessage(String pattern, String channel, String message) { - handler.handleMessage(pattern, channel, message); + public void onPMessage(final String pattern, final String channel, final String message) { + System.out.println("RECEIVED onPMessage" + channel + "\n" + message); + Runnable task = new Runnable() { + public void run() { + handler.handleMessage(pattern, channel, message); + } + }; + + runExec.execute(task); } @Override @@ -83,6 +118,6 @@ public class MessageReceiver { @Override public void onUnsubscribe(String channel, int subscribedChannels) { // Not used. - } + } } } diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageSender.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageSender.java index 109f00ca0f70789cdf42b8ea31a51e65487b4731..67869c13367f6892a4a7b3e0cf82fcd393844227 100755 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageSender.java +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/MessageSender.java @@ -8,60 +8,95 @@ import org.red5.logging.Red5LoggerFactory; import org.slf4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Protocol; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class MessageSender { private static Logger log = Red5LoggerFactory.getLogger(MessageSender.class, "bigbluebutton"); - + private JedisPool redisPool; private volatile boolean sendMessage = false; - + private final Executor msgSenderExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>(); - + private String host; + private int port; + public void stop() { sendMessage = false; + redisPool.destroy(); } - - public void start() { - log.info("Redis message publisher starting!"); + + public void start() { + + GenericObjectPoolConfig config = new GenericObjectPoolConfig(); + config.setMaxTotal(32); + config.setMaxIdle(8); + config.setMinIdle(1); + config.setTestOnBorrow(true); + config.setTestOnReturn(true); + config.setTestWhileIdle(true); + config.setNumTestsPerEvictionRun(12); + config.setMaxWaitMillis(5000); + config.setTimeBetweenEvictionRunsMillis(60000); + config.setBlockWhenExhausted(true); + + // Set the name of this client to be able to distinguish when doing + // CLIENT LIST on redis-cli + redisPool = new JedisPool(config, host, port, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE, "BbbRed5VoicePub"); + + log.info("Redis org.bigbluebutton.red5.pubsub.message publisher starting!"); try { sendMessage = true; - + Runnable messageSender = new Runnable() { - public void run() { - while (sendMessage) { - try { + public void run() { + while (sendMessage) { + try { MessageToSend msg = messages.take(); publish(msg.getChannel(), msg.getMessage()); } catch (InterruptedException e) { - log.warn("Failed to get message from queue."); - } - } - } + log.warn("Failed to get org.bigbluebutton.red5.pubsub.message from queue."); + } + } + } }; msgSenderExec.execute(messageSender); } catch (Exception e) { log.error("Error subscribing to channels: " + e.getMessage()); - } + } } - + public void send(String channel, String message) { MessageToSend msg = new MessageToSend(channel, message); messages.add(msg); } - - private void publish(String channel, String message) { - Jedis jedis = redisPool.getResource(); - try { - jedis.publish(channel, message); - } catch(Exception e){ - log.warn("Cannot publish the message to redis", e); - } finally { - redisPool.returnResource(jedis); - } + + private void publish(final String channel, final String message) { + Runnable task = new Runnable() { + public void run() { + Jedis jedis = redisPool.getResource(); + try { + jedis.publish(channel, message); + } catch(Exception e){ + log.warn("Cannot publish the org.bigbluebutton.red5.pubsub.message to redis", e); + } finally { + redisPool.returnResource(jedis); + } + } + }; + + runExec.execute(task); } - - public void setRedisPool(JedisPool redisPool){ - this.redisPool = redisPool; + + + public void setHost(String host){ + this.host = host; + } + + public void setPort(int port) { + this.port = port; } } diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/RedisMessagingService.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/RedisMessagingService.java index eda175c0843995b354683524c03237c4f6b8e549..a3efffc43690a2a451b55db34e5e00e61096695f 100755 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/RedisMessagingService.java +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/RedisMessagingService.java @@ -1,7 +1,11 @@ package org.bigbluebutton.voiceconf.messaging; +import java.util.HashMap; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.bigbluebutton.common2.msgs.*; +import com.google.gson.Gson; import org.bigbluebutton.voiceconf.messaging.messages.UserConnectedToGlobalAudio; import org.bigbluebutton.voiceconf.messaging.messages.UserDisconnectedFromGlobalAudio; import org.red5.logging.Red5LoggerFactory; @@ -13,7 +17,40 @@ public class RedisMessagingService implements IMessagingService { private static final Pattern CALLERNAME_PATTERN = Pattern.compile("(.*)-bbbID-(.*)$"); private MessageSender sender; - + + private Map<String, Object> buildEnvelope(String name, Map<String, String> routing) { + Map<String, Object> envelope = new HashMap<String, Object>(); + envelope.put("name", name); + envelope.put("routing", routing); + return envelope; + } + + private Map<String, String> buildRouting() { + Map<String, String> routing = new HashMap<String, String>(); + routing.put("msgType", "SYSTEM"); + routing.put("sender", "bbb-video"); + return routing; + } + + public void validateConnAuthToken(String meetingId, String userId, String authToken, String connId) { + BbbCoreBaseHeader header = new BbbCoreBaseHeader("ValidateConnAuthTokenSysMsg"); + ValidateConnAuthTokenSysMsgBody body = new ValidateConnAuthTokenSysMsgBody(meetingId, + userId, authToken, connId, "VIDEO"); + ValidateConnAuthTokenSysMsg msg = new ValidateConnAuthTokenSysMsg(header, body); + + Map<String, String> routing = buildRouting(); + Map<String, Object> envelope = buildEnvelope("ValidateConnAuthTokenSysMsg", routing); + + Map<String, Object> fullmsg = new HashMap<String, Object>(); + fullmsg.put("envelope", envelope); + fullmsg.put("core", msg); + + Gson gson = new Gson(); + String json = gson.toJson(fullmsg); + + sender.send("to-akka-apps-redis-channel", json); + } + @Override public void userConnectedToGlobalAudio(String voiceConf, String callerIdName) { @@ -33,7 +70,7 @@ public class RedisMessagingService implements IMessagingService { @Override public void userDisconnectedFromGlobalAudio(String voiceConf, String callerIdName) { Matcher matcher = CALLERNAME_PATTERN.matcher(callerIdName); - if (matcher.matches()) { + if (matcher.matches()) { String userid = matcher.group(1).trim(); String name = matcher.group(2).trim(); String json = new UserDisconnectedFromGlobalAudio(voiceConf, userid, name).toJson(); diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ClientMessage.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ClientMessage.java new file mode 100755 index 0000000000000000000000000000000000000000..ad1f609159f1ecc1f835d934dd1f258d1a53234c --- /dev/null +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ClientMessage.java @@ -0,0 +1,25 @@ +/** +* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ +* +* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). +* +* This program is free software; you can redistribute it and/or modify it under the +* terms of the GNU Lesser General Public License as published by the Free Software +* Foundation; either version 3.0 of the License, or (at your option) any later +* version. +* +* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License along +* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. +* +*/ +package org.bigbluebutton.voiceconf.messaging.messages; + + +public interface ClientMessage { + + String getMessageName(); +} diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ValidateConnTokenRespMsg.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ValidateConnTokenRespMsg.java new file mode 100755 index 0000000000000000000000000000000000000000..922d7adc00196d013a3a9f9b06ff1d9a3ebfadc9 --- /dev/null +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/messaging/messages/ValidateConnTokenRespMsg.java @@ -0,0 +1,20 @@ +package org.bigbluebutton.voiceconf.messaging.messages; + +public class ValidateConnTokenRespMsg implements ClientMessage { + + public final String meetingId; + public final String connId; + public final String userId; + public final Boolean authzed; + + public ValidateConnTokenRespMsg(String meetingId, String userId, Boolean authzed, String connId) { + this.meetingId = meetingId; + this.connId = connId; + this.authzed = authzed; + this.userId = userId; + } + + public String getMessageName() { + return "ValidateConnAuthTokenSysRespMsg"; + } +} diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/Application.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/Application.java index 9932ddd96329d025e91b701d120c5d9f5aa5f425..682ab351bfab818d6522f8dcf7c490363cbc37e7 100755 --- a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/Application.java +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/Application.java @@ -48,7 +48,9 @@ public class Application extends MultiThreadedApplicationAdapter { private String password = "secret"; private String username; private CallStreamFactory callStreamFactory; - + + private ConnectionInvokerService connInvokerService; + @Override public boolean appStart(IScope scope) { log.debug("VoiceConferenceApplication appStart[" + scope.getName() + "]"); @@ -279,4 +281,8 @@ public class Application extends MultiThreadedApplicationAdapter { if ((username == null) || ("".equals(username))) username = "UNKNOWN-CALLER"; return username; } + + public void setConnInvokerService(ConnectionInvokerService connInvokerService) { + this.connInvokerService = connInvokerService; + } } diff --git a/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/ConnectionInvokerService.java b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/ConnectionInvokerService.java new file mode 100755 index 0000000000000000000000000000000000000000..61ddb21852f9c50f85f8c920782a6184ebd70a9a --- /dev/null +++ b/bbb-voice/src/main/java/org/bigbluebutton/voiceconf/red5/ConnectionInvokerService.java @@ -0,0 +1,168 @@ +/** +* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ +* +* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). +* +* This program is free software; you can redistribute it and/or modify it under the +* terms of the GNU Lesser General Public License as published by the Free Software +* Foundation; either version 3.0 of the License, or (at your option) any later +* version. +* +* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License along +* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. +* +*/ +package org.bigbluebutton.voiceconf.red5; + +import com.google.gson.Gson; +import org.bigbluebutton.voiceconf.messaging.messages.ClientMessage; +import org.bigbluebutton.voiceconf.messaging.messages.ValidateConnTokenRespMsg; +import org.red5.logging.Red5LoggerFactory; +import org.red5.server.api.IConnection; +import org.red5.server.api.scope.IScope; +import org.slf4j.Logger; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + +public class ConnectionInvokerService { + private static Logger log = Red5LoggerFactory.getLogger(ConnectionInvokerService.class, "video"); + + private final String CONN = "RED5-"; + private static final int NTHREADS = 1; + private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); + private static final ExecutorService runExec = Executors.newFixedThreadPool(3); + + private BlockingQueue<ClientMessage> messages; + private volatile boolean sendMessages = false; + private IScope bbbAppScope; + + private final long SEND_TIMEOUT = 5000000000L; // 5s + + private Long lastMsgLengthLog = System.currentTimeMillis(); + + public ConnectionInvokerService() { + messages = new LinkedBlockingQueue<ClientMessage>(); + } + + public void setAppScope(IScope scope) { + bbbAppScope = scope; + } + + public void start() { + sendMessages = true; + Runnable sender = new Runnable() { + public void run() { + while (sendMessages) { + ClientMessage message; + try { + if (System.currentTimeMillis() - lastMsgLengthLog > 60000) { + lastMsgLengthLog = System.currentTimeMillis(); + log.info("Message queue length = " + messages.size()); + } + message = messages.take(); + if (log.isTraceEnabled()) { + log.trace("Took org.bigbluebutton.red5.pubsub.message from queue: " + message.getMessageName()); + } + sendMessageToClient(message); + if (log.isTraceEnabled()) { + log.trace("Sent org.bigbluebutton.red5.pubsub.message to client: " + message.getMessageName()); + } + } catch (Exception e) { + Marker sendingException = MarkerFactory.getMarker("SENDING_EXCEPTION"); + log.error(sendingException, "Exception while sending org.bigbluebutton.red5.pubsub.message to client.", e); + } + } + } + }; + exec.execute(sender); + } + + public void stop() { + sendMessages = false; + runExec.shutdown(); + } + + public void sendMessage(final ClientMessage message) { + if (log.isTraceEnabled()) { + log.trace("Queue org.bigbluebutton.red5.pubsub.message: " + message.getMessageName()); + } + messages.offer(message); + } + + private void sendMessageToClient(ClientMessage message) { + if (message instanceof ValidateConnTokenRespMsg) { + handleValidateConnTokenRespMsg((ValidateConnTokenRespMsg) message); + } + } + + private void handleValidateConnTokenRespMsg(ValidateConnTokenRespMsg msg) { + if (log.isTraceEnabled()) { + log.trace("Handle direct org.bigbluebutton.red5.pubsub.message: " + msg.getMessageName() + " conn=" + msg.connId); + } + + IScope meetingScope = getScope(msg.meetingId); + if (meetingScope != null) { + String userId = msg.userId; + IConnection conn = getConnection(meetingScope, userId); + if (conn != null) { + if (conn.isConnected() && !msg.authzed) { + Map<String, Object> logData = new HashMap<String, Object>(); + logData.put("meetingId", msg.meetingId); + logData.put("userId", userId); + logData.put("authzed", msg.authzed); + logData.put("app", "video"); + logData.put("event", "close_unauthorized_connection"); + logData.put("description", "Closing unauthorized connection."); + + Gson gson = new Gson(); + String logStr = gson.toJson(logData); + + log.info("Closing unauthorized connection: data={}", logStr); + conn.close(); + } + } + } + } + + private IConnection getConnectionWithConnId(IScope scope, String connId) { + for (IConnection conn : scope.getClientConnections()) { + String connID = (String) conn.getSessionId(); + if (connID != null && connID.equals(connId)) { + return conn; + } + } + + log.warn("Failed to get connection for connId = " + connId); + return null; + } + + private IConnection getConnection(IScope scope, String userId) { + for (IConnection conn : scope.getClientConnections()) { + String connID = (String) conn.getAttribute("USERID"); + if (connID != null && connID.equals(userId)) { + return conn; + } + } + + log.warn("Failed to get connection for userId = " + userId); + return null; + } + + public IScope getScope(String meetingID) { + if (bbbAppScope != null) { + return bbbAppScope.getContext().resolveScope("video"); + } else { + log.error("BigBlueButton Scope not initialized. No messages are going to the Flash client!"); + } + + return null; + } +} diff --git a/bbb-voice/src/main/webapp/WEB-INF/bbb-redis-messaging.xml b/bbb-voice/src/main/webapp/WEB-INF/bbb-redis-messaging.xml index 536588174e35f21fe0cebeb10b45bf050919758f..6c50ea7a70e7a2cbf49e6a7b0732dbf09048e3c7 100755 --- a/bbb-voice/src/main/webapp/WEB-INF/bbb-redis-messaging.xml +++ b/bbb-voice/src/main/webapp/WEB-INF/bbb-redis-messaging.xml @@ -27,32 +27,39 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. http://www.springframework.org/schema/util/spring-util-2.0.xsd "> - <bean id="messagingService" class="org.bigbluebutton.voiceconf.messaging.RedisMessagingService"> - <property name="redisMessageSender"> <ref bean="redisMessageSender"/></property> - </bean> - - <bean id="redisMessageSender" class="org.bigbluebutton.voiceconf.messaging.MessageSender" - init-method="start" destroy-method="stop"> - <property name="redisPool"> <ref bean="redisPool"/></property> - </bean> -<!-- - <bean id="redisMessageReceiver" class="org.bigbluebutton.voiceconf.messaging.MessageReceiver" - init-method="start" destroy-method="stop"> - <property name="redisPool"> <ref bean="redisPool"/></property> - <property name="messageHandler"> <ref local="redisMessageHandler"/> </property> - </bean> - - <bean id="redisMessageHandler" class="org.bigbluebutton.voiceconf.messaging.ReceivedMessageHandler" - init-method="start" destroy-method="stop"> - <property name="messageDistributor"><ref bean="redisMessageDistributor" /></property> - </bean> - - <bean id="redisMessageDistributor" class="org.bigbluebutton.voiceconf.messaging.MessageDistributor"> - <property name="messageHandler"> <ref local="redisMessageHandler"/> </property> - <property name="messageListeners"> - <set> - </set> - </property> - </bean> ---> + <bean id="messagingService" class="org.bigbluebutton.voiceconf.messaging.RedisMessagingService"> + <property name="redisMessageSender"> <ref bean="redisMessageSender"/></property> + </bean> + + <bean id="redisMessageSender" class="org.bigbluebutton.voiceconf.messaging.MessageSender" + init-method="start" destroy-method="stop"> + <property name="host" value="${redis.host}"/> + <property name="port" value="${redis.port}"/> + </bean> + + <bean id="meetingMessageHandler" class="org.bigbluebutton.red5.pubsub.MeetingMessageHandler"> + <property name="connInvokerService" ref="connInvokerService"/> + </bean> + + <bean id="redisMessageReceiver" class="org.bigbluebutton.voiceconf.messaging.MessageReceiver" + init-method="start" destroy-method="stop"> + <property name="host" value="${redis.host}"/> + <property name="port" value="${redis.port}"/> + <property name="messageHandler"> <ref local="redisMessageHandler"/> </property> + </bean> + + <bean id="redisMessageHandler" class="org.bigbluebutton.voiceconf.messaging.ReceivedMessageHandler" + init-method="start" destroy-method="stop"> + <property name="messageDistributor"><ref bean="redisMessageDistributor" /></property> + </bean> + + <bean id="redisMessageDistributor" class="org.bigbluebutton.voiceconf.messaging.MessageDistributor"> + <property name="messageHandler"> <ref local="redisMessageHandler"/> </property> + <property name="messageListeners"> + <set> + <ref bean="meetingMessageHandler" /> + </set> + </property> + </bean> + </beans>