diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api/MeetingService.java b/bbb-common-web/src/main/java/org/bigbluebutton/api/MeetingService.java index d25767b96013dad31aed32a1af5e664925b752c7..b2b8a0b4149c248d6052bb3998f4f5c10d0d3766 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api/MeetingService.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api/MeetingService.java @@ -91,7 +91,6 @@ public class MeetingService implements MessageListener { private RecordingService recordingService; private MessagingService messagingService; - private IPublisherService publisherService; private RegisteredUserCleanupTimerTask registeredUserCleaner; private StunTurnService stunTurnService; @@ -192,7 +191,7 @@ public class MeetingService implements MessageListener { } private void destroyMeeting(String meetingID) { - publisherService.destroyMeeting(meetingID); + messagingService.destroyMeeting(meetingID); } public Collection<Meeting> getMeetings() { @@ -264,13 +263,6 @@ public class MeetingService implements MessageListener { log.info("Create meeting: data={}", logStr); - publisherService.createMeeting(m.getInternalId(), m.getExternalId(), - m.getParentMeetingId(), m.getName(), m.isRecord(), - m.getTelVoice(), m.getDuration(), m.getAutoStartRecording(), - m.getAllowStartStopRecording(), m.getWebcamsOnlyForModerator(), - m.getModeratorPassword(), m.getViewerPassword(), - m.getCreateTime(), formatPrettyDate(m.getCreateTime()), - m.isBreakout(), m.getSequence(), m.getMetadata(), m.getGuestPolicy()); messagingService.createMeeting(m.getInternalId(), m.getExternalId(), m.getParentMeetingId(), m.getName(), m.isRecord(), @@ -291,7 +283,7 @@ public class MeetingService implements MessageListener { } private void processRegisterUser(RegisterUser message) { - publisherService.registerUser(message.meetingID, + messagingService.registerUser(message.meetingID, message.internalUserId, message.fullname, message.role, message.externUserID, message.authToken, message.avatarURL, message.guest, message.authed); } @@ -521,7 +513,7 @@ public class MeetingService implements MessageListener { } private void processEndMeeting(EndMeeting message) { - publisherService.endMeeting(message.meetingId); + messagingService.endMeeting(message.meetingId); } private void processRemoveEndedMeeting(MeetingEnded message) { @@ -759,7 +751,7 @@ public class MeetingService implements MessageListener { log.info("a org.bigbluebutton.web.services.turn: " + t.url + "username/pass=" + t.username + '/' + t.password); } - publisherService.sendStunTurnInfo(message.meetingId, + messagingService.sendStunTurnInfo(message.meetingId, message.internalUserId, stuns, turns); } @@ -924,9 +916,6 @@ public class MeetingService implements MessageListener { messagingService = mess; } - public void setPublisherService(IPublisherService mess) { - publisherService = mess; - } public void setRegisteredUserCleanupTimerTask( RegisteredUserCleanupTimerTask c) { diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessageSender.java b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessageSender.java index 5f3b830f6f46515963a199bfa4756ea1a21814d6..2ef5814b1763c17ad1a521d6bbba413d58a0fc76 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessageSender.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessageSender.java @@ -1,106 +1,16 @@ package org.bigbluebutton.api.messaging; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.Protocol; + +import org.bigbluebutton.api2.IBbbWebApiGWApp; public class MessageSender { - private static Logger log = LoggerFactory.getLogger(MessageSender.class); - - private JedisPool redisPool; - private volatile boolean sendMessage = false; - - private final Executor msgSenderExec = Executors.newSingleThreadExecutor(); - private final Executor runExec = Executors.newSingleThreadExecutor(); - private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>(); - - private String host; - private int port; - - public void stop() { - sendMessage = false; - redisPool.destroy(); - } - - public void start() { - GenericObjectPoolConfig config = new GenericObjectPoolConfig(); - config.setMaxTotal(32); - config.setMaxIdle(8); - config.setMinIdle(1); - config.setTestOnBorrow(true); - config.setTestOnReturn(true); - config.setTestWhileIdle(true); - config.setNumTestsPerEvictionRun(12); - config.setMaxWaitMillis(5000); - config.setTimeBetweenEvictionRunsMillis(60000); - config.setBlockWhenExhausted(true); - - // Set the name of this client to be able to distinguish when doing - // CLIENT LIST on redis-cli - redisPool = new JedisPool(config, host, port, Protocol.DEFAULT_TIMEOUT, null, - Protocol.DEFAULT_DATABASE, "BbbWebPub"); - - log.info("Redis message publisher starting!"); - try { - sendMessage = true; - - Runnable messageSender = new Runnable() { - public void run() { - while (sendMessage) { - try { - MessageToSend msg = messages.take(); - publish(msg.getChannel(), msg.getMessage()); - } catch (InterruptedException e) { - log.warn("Failed to get message from queue."); - } - } - } - }; - msgSenderExec.execute(messageSender); - } catch (Exception e) { - log.error("Error subscribing to channels: " + e.getMessage()); - } - } - + private IBbbWebApiGWApp gw; + public void send(String channel, String message) { - MessageToSend msg = new MessageToSend(channel, message); - messages.add(msg); + gw.send(channel, message); } - - private void publish(final String channel, final String message) { - Runnable task = new Runnable() { - public void run() { - Jedis jedis = redisPool.getResource(); - try { - if(channel.equalsIgnoreCase("bigbluebutton:from-bbb-apps:users") || channel.equalsIgnoreCase("bigbluebutton:from-bbb-apps:meeting")) - log.info("web-Publishing..." + channel + ":" + message); - jedis.publish(channel, message); - } catch(Exception e){ - log.warn("Cannot publish the message to pubsub", e); - } finally { - if (jedis != null) { - jedis.close(); - } - - } - } - }; - - runExec.execute(task); - } - - public void setHost(String host){ - this.host = host; - } - - public void setPort(int port) { - this.port = port; + + public void setGw(IBbbWebApiGWApp gw) { + this.gw = gw; } } diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessagingService.java b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessagingService.java index 73630687146f736a3b5f0f2ca6fa78c545d2b680..1671d594c0d9b7c98aa75b15cff651a860303804 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessagingService.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/MessagingService.java @@ -20,7 +20,11 @@ package org.bigbluebutton.api.messaging; +import org.bigbluebutton.web.services.turn.StunServer; +import org.bigbluebutton.web.services.turn.TurnEntry; + import java.util.Map; +import java.util.Set; public interface MessagingService { void recordMeetingInfo(String meetingId, Map<String, String> info); @@ -36,4 +40,10 @@ public interface MessagingService { String moderatorPass, String viewerPass, Long createTime, String createDate, Boolean isBreakout, Integer sequence, Map<String, String> metadata, String guestPolicy); + void registerUser(String meetingID, String internalUserId, String fullname, String role, + String externUserID, String authToken, String avatarURL, Boolean guest, Boolean authed); + void destroyMeeting(String meetingID); + void endMeeting(String meetingId); + void sendKeepAlive(String system, Long timestamp); + void sendStunTurnInfo(String meetingId, String internalUserId, Set<StunServer> stuns, Set<TurnEntry> turns); } diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java b/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java new file mode 100755 index 0000000000000000000000000000000000000000..955d5c9c1b71d059bfb890c4eb84649cc17c1421 --- /dev/null +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java @@ -0,0 +1,6 @@ +package org.bigbluebutton.api2; + + +public interface IBbbWebApiGWApp { + void send(String channel, String message); +} diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java index 7e0528ee4b9e7b62d6e5e4c5b2a69ee4f75c6e7a..2d28f3f86f09a1c8260db8e444bca02c29640492 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java @@ -37,13 +37,13 @@ import org.slf4j.LoggerFactory; public class KeepAliveService implements MessageListener { private static Logger log = LoggerFactory.getLogger(KeepAliveService.class); private final String KEEP_ALIVE_REQUEST = "KEEP_ALIVE_REQUEST"; - private IPublisherService service; + private MessagingService service; private long runEvery = 10000; private int maxLives = 5; private KeepAliveTask task = new KeepAliveTask(); private volatile boolean processMessages = false; - volatile boolean available = false; + volatile boolean available = true; private static final Executor msgSenderExec = Executors.newFixedThreadPool(1); private static final Executor runExec = Executors.newFixedThreadPool(1); @@ -70,7 +70,7 @@ public class KeepAliveService implements MessageListener { runEvery = v * 1000; } - public void setPublisherService(IPublisherService service){ + public void setPublisherService(MessagingService service){ this.service = service; } @@ -162,4 +162,4 @@ public class KeepAliveService implements MessageListener { handleKeepAliveReply(msg.system, msg.timestamp); } } -} \ No newline at end of file +} diff --git a/bbb-common-web/src/main/resources/reference.conf b/bbb-common-web/src/main/resources/reference.conf new file mode 100755 index 0000000000000000000000000000000000000000..f5ba5981ea95dd5f9aa8871e4a5e9b524822df44 --- /dev/null +++ b/bbb-common-web/src/main/resources/reference.conf @@ -0,0 +1,43 @@ +akka { + actor { + debug { + # enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill et.c.) + autoreceive = on + # enable DEBUG logging of actor lifecycle changes + lifecycle = on + } + } + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "DEBUG" + + rediscala-publish-worker-dispatcher { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 512 + } + + rediscala-subscriber-worker-dispatcher { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + # Throughput defines the maximum number of messages to be + # processed per actor before the thread jumps to the next actor. + # Set to 1 for as fair as possible. + throughput = 512 + } +} + +redis { + host="127.0.0.1" + port=6379 + password="" + # recording keys should expire in 14 days + keyExpiry=1209600 +} + +eventBus { + meetingManagerChannel = "MeetingManagerChannel" + outMessageChannel = "OutgoingMessageChannel" + incomingJsonMsgChannel = "IncomingJsonMsgChannel" + outBbbMsgMsgChannel = "OutBbbMsgChannel" +} \ No newline at end of file diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala index 7aba93e921c1ea426ee2b6fd993b8107dbaf2e9c..60b507769adc7ab1781c4e369fe38d2bc4c9a0d9 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala @@ -7,9 +7,9 @@ import org.bigbluebutton.api2.meeting.MeetingsManagerActor import scala.concurrent.duration._ -class BbbWebApiGWApp(val oldMessageReceivedGW: OldMessageReceivedGW) extends SystemConfiguration{ +class BbbWebApiGWApp(val oldMessageReceivedGW: OldMessageReceivedGW) extends IBbbWebApiGWApp with SystemConfiguration{ - implicit val system = ActorSystem("bbb-apps-common") + implicit val system = ActorSystem("bbb-web-common") implicit val timeout = akka.util.Timeout(3 seconds) private val jsonMsgToAkkaAppsBus = new JsonMsgToAkkaAppsBus diff --git a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml index 279d8c6e55a95d97b80791769afbd4a409313063..a406431d072e3990281bb1d41e30f4d1f14dc199 100755 --- a/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml +++ b/bigbluebutton-web/grails-app/conf/spring/bbb-redis-messaging.xml @@ -32,19 +32,7 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. <property name="host" value="${redisHost}" /> <property name="port" value="${redisPort}" /> </bean> - - <bean id="messageSender" class="org.bigbluebutton.api.messaging.MessageSender" - init-method="start" destroy-method="stop"> - <property name="host" value="${redisHost}" /> - <property name="port" value="${redisPort}" /> - </bean> - <bean id="redisMessageReceiver" class="org.bigbluebutton.api.messaging.MessageReceiver" - init-method="start" destroy-method="stop"> - <property name="host" value="${redisHost}" /> - <property name="port" value="${redisPort}" /> - <property name="messageHandler"> <ref local="redisMessageHandler"/> </property> - </bean> <bean id="redisMessageHandler" class="org.bigbluebutton.api.messaging.ReceivedMessageHandler" init-method="start" destroy-method="stop"> diff --git a/bigbluebutton-web/grails-app/conf/spring/resources.xml b/bigbluebutton-web/grails-app/conf/spring/resources.xml index 6657c67e3634b9ef8c8f3dc5ba5c50f007e9823f..338d7dfc4b4a31dae046cd4e53111875ed7bd23e 100755 --- a/bigbluebutton-web/grails-app/conf/spring/resources.xml +++ b/bigbluebutton-web/grails-app/conf/spring/resources.xml @@ -31,26 +31,33 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. <property name="redisStorageService" ref="redisStorageService"/> </bean> - <bean id="registeredUserCleanupTimerTask" class="org.bigbluebutton.web.services.RegisteredUserCleanupTimerTask"/> + <bean id="messageSender" class="org.bigbluebutton.api.messaging.MessageSender"> + <property name="gw" ref="bbbWebApiGWApp" /> + </bean> - <bean id="keepAliveService" class="org.bigbluebutton.web.services.KeepAliveService" - init-method="start" destroy-method="stop"> - <property name="runEvery" value="${checkBBBServerEvery}"/> - <property name="publisherService" ref="redisPublisher" /> - </bean> + <bean id="registeredUserCleanupTimerTask" class="org.bigbluebutton.web.services.RegisteredUserCleanupTimerTask"/> + + <bean id="keepAliveService" class="org.bigbluebutton.web.services.KeepAliveService" + init-method="start" destroy-method="stop"> + <property name="runEvery" value="${checkBBBServerEvery}"/> + <property name="publisherService" ref="messagingService" /> + </bean> <bean id="meetingService" class="org.bigbluebutton.api.MeetingService" init-method="start" destroy-method="stop"> <property name="messagingService" ref="messagingService"/> - <property name="publisherService" ref="redisPublisher"/> <property name="recordingService" ref="recordingService"/> <property name="presDownloadService" ref="presDownloadService"/> <property name="paramsProcessorUtil" ref="paramsProcessorUtil"/> <property name="stunTurnService" ref="stunTurnService"/> <property name="registeredUserCleanupTimerTask" ref="registeredUserCleanupTimerTask"/> - </bean> + </bean> + + <bean id="oldMessageReceivedGW" class="org.bigbluebutton.api2.bus.OldMessageReceivedGW"> + <constructor-arg index="0" ref="redisMessageHandler"/> + </bean> - <bean id="redisPublisher" class="org.bigbluebutton.api.pub.RedisPublisherService"> - <constructor-arg index="0" ref="messageSender"/> + <bean id="bbbWebApiGWApp" class="org.bigbluebutton.api2.BbbWebApiGWApp"> + <constructor-arg index="0" ref="oldMessageReceivedGW"/> </bean> <bean id="recordingServiceHelper" class="org.bigbluebutton.api.RecordingServiceHelperImp"/>