diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/SystemConfiguration.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/SystemConfiguration.scala index 92d157dc0f8449733dd64ed3d377a31c7a12b62f..72e0bbef7f35c810c812886b5cd593bb2d03c53c 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/SystemConfiguration.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/SystemConfiguration.scala @@ -43,4 +43,7 @@ trait SystemConfiguration { lazy val toAkkaAppsJsonChannel = Try(config.getString("eventBus.toAkkaAppsChannel")).getOrElse("to-akka-apps-json-channel") lazy val fromAkkaAppsJsonChannel = Try(config.getString("eventBus.fromAkkaAppsChannel")).getOrElse("from-akka-apps-json-channel") lazy val fromAkkaAppsOldJsonChannel = Try(config.getString("eventBus.fromAkkaAppsOldChannel")).getOrElse("from-akka-apps-old-json-channel") + + lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel") + lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-void-conf-redis-channel") } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala index e2f1f326bb756d375aef0439d67a202f890ab5b7..d3862481722192d2eec1c0cffc63f6822969d8b7 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/FromAkkaAppsMsgSenderActor.scala @@ -11,16 +11,16 @@ object FromAkkaAppsMsgSenderActor { def props(msgSender: MessageSender): Props = Props(classOf[FromAkkaAppsMsgSenderActor], msgSender) } -class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) extends Actor with ActorLogging with SystemConfiguration { +class FromAkkaAppsMsgSenderActor(msgSender: MessageSender) + extends Actor with ActorLogging with SystemConfiguration { def receive = { case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) - case _ => println("************* FromAkkaAppsMsgSenderActor Cannot handle message ") + case _ => log.warning("Cannot handle message ") } def handleBbbCommonEnvCoreMsg(msg: BbbCommonEnvCoreMsg): Unit = { val json = JsonUtil.toJson(msg) - println("****** Publishing " + json) msgSender.send(fromAkkaAppsRedisChannel, json) } } diff --git a/akka-bbb-fsesl/build.sbt b/akka-bbb-fsesl/build.sbt index 1abcc7e1ceda8107212773722cc98b326f2e900e..739e68acf75feea56af3247036f3328f642a3355 100755 --- a/akka-bbb-fsesl/build.sbt +++ b/akka-bbb-fsesl/build.sbt @@ -37,25 +37,51 @@ testOptions in Test += Tests.Argument(TestFrameworks.Specs2, "html", "console", testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-h", "target/scalatest-reports") +val akkaVersion = "2.5.1" +val scalaTestV = "2.2.6" + + libraryDependencies ++= { - val akkaVersion = "2.5.1" Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "ch.qos.logback" % "logback-classic" % "1.0.3", - "org.pegdown" % "pegdown" % "1.4.0", "junit" % "junit" % "4.11", - "com.github.etaty" % "rediscala_2.12" % "1.8.0", "commons-codec" % "commons-codec" % "1.10", "joda-time" % "joda-time" % "2.3", - "com.google.code.gson" % "gson" % "1.7.1", - "redis.clients" % "jedis" % "2.1.0", - "org.apache.commons" % "commons-lang3" % "3.2", - "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT", - "org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4" + "org.apache.commons" % "commons-lang3" % "3.2" + )} +libraryDependencies += "org.bigbluebutton" % "bbb-common-message_2.12" % "0.0.19-SNAPSHOT" + +libraryDependencies += "org.bigbluebutton" % "bbb-fsesl-client" % "0.0.4" + +// https://mvnrepository.com/artifact/org.scala-lang/scala-library +libraryDependencies += "org.scala-lang" % "scala-library" % "2.12.2" +// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler +libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.2" + +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-actor_2.12" % akkaVersion + +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-slf4j_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-slf4j_2.12" % akkaVersion + +// https://mvnrepository.com/artifact/com.github.etaty/rediscala_2.12 +libraryDependencies += "com.github.etaty" % "rediscala_2.12" % "1.8.0" + +// For generating test reports +libraryDependencies += "org.pegdown" % "pegdown" % "1.6.0" % "test" +// https://mvnrepository.com/artifact/com.typesafe.akka/akka-testkit_2.12 +libraryDependencies += "com.typesafe.akka" % "akka-testkit_2.12" % "2.5.1" % "test" + +// https://mvnrepository.com/artifact/org.scalactic/scalactic_2.12 +libraryDependencies += "org.scalactic" % "scalactic_2.12" % "3.0.3" % "test" + +// https://mvnrepository.com/artifact/org.scalatest/scalatest_2.12 +libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.3" % "test" + +libraryDependencies += "org.mockito" % "mockito-core" % "2.7.22" % "test" + seq(Revolver.settings: _*) scalariformSettings diff --git a/akka-bbb-fsesl/src/main/java/org/bigbluebutton/freeswitch/voice/freeswitch/FreeswitchApplication.java b/akka-bbb-fsesl/src/main/java/org/bigbluebutton/freeswitch/voice/freeswitch/FreeswitchApplication.java index 3a178e28744820e1cfbad0d0e504d591f93a6b4e..d39365f7cd45b1add48bfea8b89bdc2541dff421 100755 --- a/akka-bbb-fsesl/src/main/java/org/bigbluebutton/freeswitch/voice/freeswitch/FreeswitchApplication.java +++ b/akka-bbb-fsesl/src/main/java/org/bigbluebutton/freeswitch/voice/freeswitch/FreeswitchApplication.java @@ -1,21 +1,20 @@ /** -* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ -* -* Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below). -* -* This program is free software; you can redistribute it and/or modify it under the -* terms of the GNU Lesser General Public License as published by the Free Software -* Foundation; either version 3.0 of the License, or (at your option) any later -* version. -* -* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY -* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public License along -* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. -* -*/ + * BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ + * <p> + * Copyright (c) 2015 BigBlueButton Inc. and by respective authors (see below). + * <p> + * This program is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation; either version 3.0 of the License, or (at your option) any later + * version. + * <p> + * BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * <p> + * You should have received a copy of the GNU Lesser General Public License along + * with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. + */ package org.bigbluebutton.freeswitch.voice.freeswitch; import java.io.File; @@ -24,6 +23,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; + import org.bigbluebutton.freeswitch.voice.freeswitch.actions.BroadcastConferenceCommand; import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectAllUsersCommand; import org.bigbluebutton.freeswitch.voice.freeswitch.actions.EjectUserCommand; @@ -36,149 +36,148 @@ import org.bigbluebutton.freeswitch.voice.freeswitch.actions.*; public class FreeswitchApplication { - private static final int SENDERTHREADS = 1; - private static final Executor msgSenderExec = Executors - .newFixedThreadPool(SENDERTHREADS); - private static final Executor runExec = Executors - .newFixedThreadPool(SENDERTHREADS); - private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>(); - - private final ConnectionManager manager; - - private final String USER = "0"; /* not used for now */ - - private volatile boolean sendMessages = false; - - private final String audioProfile; - - public FreeswitchApplication(ConnectionManager manager, String profile) { - this.manager = manager; - this.audioProfile = profile; - } - - private void queueMessage(FreeswitchCommand command) { - try { - messages.offer(command, 5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - public void transferUserToMeeting(String voiceConfId, - String targetVoiceConfId, String voiceUserId) { - TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand( - voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile, - USER); - queueMessage(tutmc); - } + private static final int SENDERTHREADS = 1; + private static final Executor msgSenderExec = Executors.newFixedThreadPool(SENDERTHREADS); + private static final Executor runExec = Executors.newFixedThreadPool(SENDERTHREADS); + private BlockingQueue<FreeswitchCommand> messages = new LinkedBlockingQueue<FreeswitchCommand>(); + + private final ConnectionManager manager; + + private final String USER = "0"; /* not used for now */ - public void start() { - sendMessages = true; - Runnable sender = new Runnable() { - public void run() { - while (sendMessages) { - FreeswitchCommand message; - try { - message = messages.take(); - sendMessageToFreeswitch(message); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - }; - msgSenderExec.execute(sender); - } - - public void getAllUsers(String voiceConfId) { - GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER); - queueMessage(prc); - } - - public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) { - MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER); - queueMessage(mpc); - } - - public void eject(String voiceConfId, String voiceUserId) { - EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER); - queueMessage(mpc); - } - - public void ejectAll(String voiceConfId) { - EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER); - queueMessage(mpc); - } - - private Long genTimestamp() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - - public void startRecording(String voiceConfId, String meetingid){ - String RECORD_DIR = "/var/freeswitch/meetings"; - String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav"; - - RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath); - queueMessage(rcc); - } - - public void stopRecording(String voiceConfId, String meetingid, String voicePath){ - RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath); - queueMessage(rcc); - } - - public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast){ - DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast); - queueMessage(rtmp); - } - - public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp){ - DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp); - queueMessage(huCmd); - } - private void sendMessageToFreeswitch(final FreeswitchCommand command) { - Runnable task = new Runnable() { - public void run() { - if (command instanceof GetAllUsersCommand) { - GetAllUsersCommand cmd = (GetAllUsersCommand) command; - System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]"); - manager.getUsers(cmd); - } else if (command instanceof MuteUserCommand) { - MuteUserCommand cmd = (MuteUserCommand) command; - System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]"); - manager.mute(cmd); - } else if (command instanceof EjectUserCommand) { - EjectUserCommand cmd = (EjectUserCommand) command; - System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]"); - manager.eject(cmd); - } else if (command instanceof EjectAllUsersCommand) { - EjectAllUsersCommand cmd = (EjectAllUsersCommand) command; - System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]"); - manager.ejectAll(cmd); - } else if (command instanceof TransferUserToMeetingCommand) { - TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command; - System.out.println("Sending TransferUsetToMeetingCommand for conference = [" - + cmd.getRoom() + "]"); - manager.tranfer(cmd); - } else if (command instanceof RecordConferenceCommand) { - manager.record((RecordConferenceCommand) command); - } else if (command instanceof DeskShareBroadcastRTMPCommand) { - manager.broadcastRTMP((DeskShareBroadcastRTMPCommand)command); - } else if (command instanceof DeskShareHangUpCommand) { - DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command; - manager.hangUp(cmd); - } else if (command instanceof BroadcastConferenceCommand) { - manager.broadcast((BroadcastConferenceCommand) command); - } - } - }; - - runExec.execute(task); - } - - public void stop() { - sendMessages = false; - } + private volatile boolean sendMessages = false; + + private final String audioProfile; + + public FreeswitchApplication(ConnectionManager manager, String profile) { + this.manager = manager; + this.audioProfile = profile; + } + + private void queueMessage(FreeswitchCommand command) { + try { + messages.offer(command, 5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void transferUserToMeeting(String voiceConfId, + String targetVoiceConfId, String voiceUserId) { + TransferUserToMeetingCommand tutmc = new TransferUserToMeetingCommand( + voiceConfId, targetVoiceConfId, voiceUserId, this.audioProfile, + USER); + queueMessage(tutmc); + } + + public void start() { + sendMessages = true; + Runnable sender = new Runnable() { + public void run() { + while (sendMessages) { + FreeswitchCommand message; + try { + message = messages.take(); + sendMessageToFreeswitch(message); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }; + msgSenderExec.execute(sender); + } + + public void getAllUsers(String voiceConfId) { + GetAllUsersCommand prc = new GetAllUsersCommand(voiceConfId, USER); + queueMessage(prc); + } + + public void muteUser(String voiceConfId, String voiceUserId, Boolean mute) { + MuteUserCommand mpc = new MuteUserCommand(voiceConfId, voiceUserId, mute, USER); + queueMessage(mpc); + } + + public void eject(String voiceConfId, String voiceUserId) { + EjectUserCommand mpc = new EjectUserCommand(voiceConfId, voiceUserId, USER); + queueMessage(mpc); + } + + public void ejectAll(String voiceConfId) { + EjectAllUsersCommand mpc = new EjectAllUsersCommand(voiceConfId, USER); + queueMessage(mpc); + } + + private Long genTimestamp() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + } + + public void startRecording(String voiceConfId, String meetingid) { + String RECORD_DIR = "/var/freeswitch/meetings"; + String voicePath = RECORD_DIR + File.separatorChar + meetingid + "-" + genTimestamp() + ".wav"; + + RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, true, voicePath); + queueMessage(rcc); + } + + public void stopRecording(String voiceConfId, String meetingid, String voicePath) { + RecordConferenceCommand rcc = new RecordConferenceCommand(voiceConfId, USER, false, voicePath); + queueMessage(rcc); + } + + public void deskShareBroadcastRTMP(String voiceConfId, String streamUrl, String timestamp, Boolean broadcast) { + DeskShareBroadcastRTMPCommand rtmp = new DeskShareBroadcastRTMPCommand(voiceConfId, USER, streamUrl, timestamp, broadcast); + queueMessage(rtmp); + } + + public void deskShareHangUp(String voiceConfId, String fsConferenceName, String timestamp) { + DeskShareHangUpCommand huCmd = new DeskShareHangUpCommand(voiceConfId, fsConferenceName, USER, timestamp); + queueMessage(huCmd); + } + + private void sendMessageToFreeswitch(final FreeswitchCommand command) { + Runnable task = new Runnable() { + public void run() { + if (command instanceof GetAllUsersCommand) { + GetAllUsersCommand cmd = (GetAllUsersCommand) command; + System.out.println("Sending PopulateRoomCommand for conference = [" + cmd.getRoom() + "]"); + manager.getUsers(cmd); + } else if (command instanceof MuteUserCommand) { + MuteUserCommand cmd = (MuteUserCommand) command; + System.out.println("Sending MuteParticipantCommand for conference = [" + cmd.getRoom() + "]"); + manager.mute(cmd); + } else if (command instanceof EjectUserCommand) { + EjectUserCommand cmd = (EjectUserCommand) command; + System.out.println("Sending EjectParticipantCommand for conference = [" + cmd.getRoom() + "]"); + manager.eject(cmd); + } else if (command instanceof EjectAllUsersCommand) { + EjectAllUsersCommand cmd = (EjectAllUsersCommand) command; + System.out.println("Sending EjectAllUsersCommand for conference = [" + cmd.getRoom() + "]"); + manager.ejectAll(cmd); + } else if (command instanceof TransferUserToMeetingCommand) { + TransferUserToMeetingCommand cmd = (TransferUserToMeetingCommand) command; + System.out.println("Sending TransferUsetToMeetingCommand for conference = [" + + cmd.getRoom() + "]"); + manager.tranfer(cmd); + } else if (command instanceof RecordConferenceCommand) { + manager.record((RecordConferenceCommand) command); + } else if (command instanceof DeskShareBroadcastRTMPCommand) { + manager.broadcastRTMP((DeskShareBroadcastRTMPCommand) command); + } else if (command instanceof DeskShareHangUpCommand) { + DeskShareHangUpCommand cmd = (DeskShareHangUpCommand) command; + manager.hangUp(cmd); + } else if (command instanceof BroadcastConferenceCommand) { + manager.broadcast((BroadcastConferenceCommand) command); + } + } + }; + + runExec.execute(task); + } + + public void stop() { + sendMessages = false; + } } diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala index 7c6e0d4eb7937ba54dbf51c131832e4173bfe584..9abd94ab99525008770eb30d04d89f6ada4bb23e 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/Boot.scala @@ -1,16 +1,13 @@ package org.bigbluebutton import akka.actor.{ ActorSystem, Props } -import scala.concurrent.duration._ -import redis.RedisClient -import scala.concurrent.{ Future, Await } -import scala.concurrent.ExecutionContext.Implicits.global + import org.freeswitch.esl.client.manager.DefaultManagerConnection -import org.bigbluebutton.endpoint.redis.{ RedisPublisher, AppsRedisSubscriberActor } -import org.bigbluebutton.freeswitch.VoiceConferenceService +import org.bigbluebutton.endpoint.redis.{ AppsRedisSubscriberActor, RedisPublisher } +import org.bigbluebutton.freeswitch.{ RxJsonMsgHdlrActor, VoiceConferenceService } +import org.bigbluebutton.freeswitch.bus.InsonMsgBus import org.bigbluebutton.freeswitch.voice.FreeswitchConferenceEventListener -import org.bigbluebutton.freeswitch.voice.freeswitch.{ ESLEventListener, ConnectionManager, FreeswitchApplication } -import org.bigbluebutton.freeswitch.voice.IVoiceConferenceService +import org.bigbluebutton.freeswitch.voice.freeswitch.{ ConnectionManager, ESLEventListener, FreeswitchApplication } import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver object Boot extends App with SystemConfiguration { @@ -19,7 +16,7 @@ object Boot extends App with SystemConfiguration { val redisPublisher = new RedisPublisher(system) - val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword); + val eslConnection = new DefaultManagerConnection(eslHost, eslPort, eslPassword) val voiceConfService = new VoiceConferenceService(redisPublisher) @@ -36,5 +33,10 @@ object Boot extends App with SystemConfiguration { val redisMsgReceiver = new RedisMessageReceiver(fsApplication) - val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver), "redis-subscriber") + val inJsonMsgBus = new InsonMsgBus + val redisMessageHandlerActor = system.actorOf(RxJsonMsgHdlrActor.props(fsApplication)) + inJsonMsgBus.subscribe(redisMessageHandlerActor, toFsAppsJsonChannel) + + val redisSubscriberActor = system.actorOf(AppsRedisSubscriberActor.props(system, redisMsgReceiver, inJsonMsgBus), "redis-subscriber") + } diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/SystemConfiguration.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/SystemConfiguration.scala index 8ef716eba5d5cd2130bbfe15f698bb66df2e396f..ed9acc8ce49f031bba8482580a91f1aee136ff90 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/SystemConfiguration.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/SystemConfiguration.scala @@ -16,4 +16,8 @@ trait SystemConfiguration { lazy val redisPort = Try(config.getInt("redis.port")).getOrElse(6379) lazy val redisPassword = Try(config.getString("redis.password")).getOrElse("") -} \ No newline at end of file + lazy val toVoiceConfRedisChannel = Try(config.getString("redis.toVoiceConfRedisChannel")).getOrElse("to-voice-conf-redis-channel") + lazy val fromVoiceConfRedisChannel = Try(config.getString("redis.fromVoiceConfRedisChannel")).getOrElse("from-void-conf-redis-channel") + lazy val toFsAppsJsonChannel = Try(config.getString("eventBus.toFsAppsChannel")).getOrElse("to-fs-apps-json-channel") + lazy val fromFsAppsJsonChannel = Try(config.getString("eventBus.fromFsAppsChannel")).getOrElse("from-fs-apps-json-channel") +} diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala index 15a1a31857edc93ddf91111cb9cd77e47c43124d..f6cff44d915dc876df0a0152215be023798636bd 100755 --- a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/endpoint/redis/AppsRedisSubscriberActor.scala @@ -6,16 +6,13 @@ import java.net.InetSocketAddress import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.DurationInt - import org.bigbluebutton.SystemConfiguration -import org.bigbluebutton.common.converters.FromJsonDecoder -import org.bigbluebutton.common.messages.PubSubPongMessage import org.bigbluebutton.freeswitch.pubsub.receivers.RedisMessageReceiver - import akka.actor.ActorSystem import akka.actor.OneForOneStrategy import akka.actor.Props import akka.actor.SupervisorStrategy.Resume +import org.bigbluebutton.freeswitch.bus.{ InJsonMsg, InsonMsgBus, ReceivedJsonMsg } import redis.actors.RedisSubscriberActor import redis.api.pubsub.Message import redis.api.pubsub.PMessage @@ -26,13 +23,14 @@ object AppsRedisSubscriberActor extends SystemConfiguration { val channels = Seq("time") val patterns = Seq("bigbluebutton:to-voice-conf:*", "bigbluebutton:from-bbb-apps:*") - def props(system: ActorSystem, msgReceiver: RedisMessageReceiver): Props = - Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, + def props(system: ActorSystem, msgReceiver: RedisMessageReceiver, inJsonMgBus: InsonMsgBus): Props = + Props(classOf[AppsRedisSubscriberActor], system, msgReceiver, inJsonMgBus, redisHost, redisPort, channels, patterns).withDispatcher("akka.rediscala-subscriber-worker-dispatcher") } -class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, redisHost: String, +class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessageReceiver, + inJsonMgBus: InsonMsgBus, redisHost: String, redisPort: Int, channels: Seq[String] = Nil, patterns: Seq[String] = Nil) extends RedisSubscriberActor(new InetSocketAddress(redisHost, redisPort), @@ -48,7 +46,7 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag } } - val decoder = new FromJsonDecoder() + // val decoder = new FromJsonDecoder() var lastPongReceivedOn = 0L system.scheduler.schedule(10 seconds, 10 seconds)(checkPongMessage()) @@ -67,26 +65,17 @@ class AppsRedisSubscriberActor(val system: ActorSystem, msgReceiver: RedisMessag def onMessage(message: Message) { log.debug(s"message received: $message") + if (message.channel == toVoiceConfRedisChannel) { + val receivedJsonMessage = new ReceivedJsonMsg(message.channel, message.data.utf8String) + log.debug(s"RECEIVED:\n [${receivedJsonMessage.channel}] \n ${receivedJsonMessage.data} \n") + inJsonMgBus.publish(InJsonMsg(toFsAppsJsonChannel, receivedJsonMessage)) + } } def onPMessage(pmessage: PMessage) { // log.debug(s"pattern message received: $pmessage") - val msg = decoder.decodeMessage(pmessage.data.utf8String) - - if (msg != null) { - msg match { - case m: PubSubPongMessage => { - if (m.payload.system == "BbbFsESL") { - lastPongReceivedOn = System.currentTimeMillis() - } - } - case _ => // do nothing - } - } else { - msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String) - } - + msgReceiver.handleMessage(pmessage.patternMatched, pmessage.channel, pmessage.data.utf8String) } def handleMessage(msg: String) { diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgDeserializer.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgDeserializer.scala new file mode 100755 index 0000000000000000000000000000000000000000..c265b73792a0ea3b79931675cd51532d0315a379 --- /dev/null +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgDeserializer.scala @@ -0,0 +1,7 @@ +package org.bigbluebutton.freeswitch + +import org.bigbluebutton.common2.messages.Deserializer + +trait RxJsonMsgDeserializer { + object JsonDeserializer extends Deserializer +} diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala new file mode 100755 index 0000000000000000000000000000000000000000..421b65844691b831206447eb4631f87d369335fb --- /dev/null +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/RxJsonMsgHdlrActor.scala @@ -0,0 +1,36 @@ +package org.bigbluebutton.freeswitch + +import akka.actor.{ Actor, ActorLogging, Props } +import com.fasterxml.jackson.databind.JsonNode +import org.bigbluebutton.SystemConfiguration +import org.bigbluebutton.common2.messages.BbbCoreEnvelope +import org.bigbluebutton.freeswitch.bus.{ ReceivedJsonMsg } +import org.bigbluebutton.freeswitch.voice.freeswitch.FreeswitchApplication + +object RxJsonMsgHdlrActor { + def props(fsApp: FreeswitchApplication): Props = + Props(classOf[RxJsonMsgHdlrActor], fsApp) +} + +class RxJsonMsgHdlrActor(fsApp: FreeswitchApplication) extends Actor with ActorLogging + with SystemConfiguration with RxJsonMsgDeserializer { + def receive = { + case msg: ReceivedJsonMsg => + log.debug("handling {} - {}", msg.channel, msg.data) + handleReceivedJsonMessage(msg) + case _ => // do nothing + } + + def handleReceivedJsonMessage(msg: ReceivedJsonMsg): Unit = { + for { + envJsonNode <- JsonDeserializer.toBbbCommonEnvJsNodeMsg(msg.data) + } yield handle(envJsonNode.envelope, envJsonNode.core) + } + + def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { + log.debug("Route envelope name " + envelope.name) + envelope.name match { + case _ => // do nothing + } + } +} diff --git a/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala new file mode 100755 index 0000000000000000000000000000000000000000..6f7bb865ea8c1002d7f6ff16655a3c2eb016568e --- /dev/null +++ b/akka-bbb-fsesl/src/main/scala/org/bigbluebutton/freeswitch/bus/InJsonMsgBus.scala @@ -0,0 +1,31 @@ +package org.bigbluebutton.freeswitch.bus + +import akka.actor.ActorRef +import akka.event.{ EventBus, LookupClassification } + +case class ReceivedJsonMsg(channel: String, data: String) +case class InJsonMsg(val topic: String, val payload: ReceivedJsonMsg) + +class InsonMsgBus extends EventBus with LookupClassification { + type Event = InJsonMsg + type Classifier = String + type Subscriber = ActorRef + + // is used for extracting the classifier from the incoming events + override protected def classify(event: Event): Classifier = event.topic + + // will be invoked for each event for all subscribers which registered themselves + // for the event’s classifier + override protected def publish(event: Event, subscriber: Subscriber): Unit = { + subscriber ! event.payload + } + + // must define a full order over the subscribers, expressed as expected from + // `java.lang.Comparable.compare` + override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = + a.compareTo(b) + + // determines the initial size of the index data structure + // used internally (i.e. the expected number of different classifiers) + override protected def mapSize: Int = 128 +} diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Messages.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Messages.scala index 5e395492072b35e839a9917198d26451089f56ac..a73e1fc632350c502e20d011263b27147be27e31 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Messages.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/Messages.scala @@ -10,9 +10,9 @@ object MessageTypes { } // seal trait to force all classes that extends this trait to be defined in this file. -sealed trait BbbCoreMsg +trait BbbCoreMsg sealed trait BbbCommonMsg -sealed trait BbbCoreHeader +trait BbbCoreHeader case class RoutingEnvelope(msgType: String, meetingId: String, userId: String) case class BbbMsgToClientEnvelope(name: String, routing: RoutingEnvelope) @@ -56,6 +56,8 @@ case class UserBroadcastCamStartMsg(header: BbbClientMsgHeader, body: UserBroadc object UserBroadcastCamStopMsg { val NAME = "UserBroadcastCamStopMsg"} case class UserBroadcastCamStopMsg(header: BbbClientMsgHeader, body: UserBroadcastCamStopMsgBody) extends BbbCoreMsg + + /** Event messages sent by Akka apps as result of receiving incoming messages ***/ object MeetingCreatedEvtMsg { val NAME = "MeetingCreatedEvtMsg"} case class MeetingCreatedEvtMsg(header: BbbCoreBaseHeader, diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/voiceconf/Messages.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/voiceconf/Messages.scala new file mode 100755 index 0000000000000000000000000000000000000000..191596ecded785be087d81ad686ac33d69bd05f3 --- /dev/null +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/messages/voiceconf/Messages.scala @@ -0,0 +1,106 @@ +package org.bigbluebutton.common2.messages.voiceconf + +import org.bigbluebutton.common2.messages.{BbbCoreHeader, BbbCoreHeaderWithMeetingId, BbbCoreMsg} + +/*** Message from Akka Apps to FS Conference ***/ +object EjectAllFromVoiceConfMsg { val NAME = "EjectAllFromVoiceConfMsg" } +case class EjectAllFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: EjectAllFromVoiceConfMsgBody) extends BbbCoreMsg +case class EjectAllFromVoiceConfMsgBody(voiceConf: String) + +object EjectUserFromVoiceConfMsg { val NAME = "EjectUserFromVoiceConfMsg"} +case class EjectUserFromVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: EjectUserFromVoiceConfMsgBody) extends BbbCoreMsg +case class EjectUserFromVoiceConfMsgBody(voiceConf: String, voiceUserId: String) + +object MuteUserInVoiceConfMsg { val NAME = "MuteUserInVoiceConfMsg" } +case class MuteUserInVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: MuteUserInVoiceConfMsgBody) extends BbbCoreMsg +case class MuteUserInVoiceConfMsgBody(voiceConf: String, voiceUserId: String, mute: Boolean) + +object TransferUserToVoiceConfMsg { val NAME = "TransferUserToVoiceConfMsg" } +case class TransferUserToVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: TransferUserToVoiceConfMsgBody) extends BbbCoreMsg +case class TransferUserToVoiceConfMsgBody(fromVoiceConf: String, toVoiceConf: String, voiceUserId: String) + +object StartRecordingVoiceConfMsg { val NAME = "StartRecordingVoiceConfMsg" } +case class StartRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: StartRecordingVoiceConfMsgBody) extends BbbCoreMsg +case class StartRecordingVoiceConfMsgBody(voiceConf: String) + +object StopRecordingVoiceConfMsg { val NAME = "StopRecordingVoiceConfMsg" } +case class StopRecordingVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: StopRecordingVoiceConfMsgBody) extends BbbCoreMsg +case class StopRecordingVoiceConfMsgBody(voiceConf: String, stream: String) + +object DeskshareStartRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStartRtmpBroadcastVoiceConfMsg" } +case class DeskshareStartRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: DeskshareStartRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg +case class DeskshareStartRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String) + +object DeskshareStopRtmpBroadcastVoiceConfMsg { val NAME = "DeskshareStopRtmpBroadcastVoiceConfMsg" } +case class DeskshareStopRtmpBroadcastVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: DeskshareStopRtmpBroadcastVoiceConfMsgBody) extends BbbCoreMsg +case class DeskshareStopRtmpBroadcastVoiceConfMsgBody(voiceConf: String, deskshareConf: String, url: String, timestamp: String) + +object DeskshareHangUpVoiceConfMsg { val NAME = "DeskshareHangUpVoiceConfMsg" } +case class DeskshareHangUpVoiceConfMsg(header: BbbCoreHeaderWithMeetingId, + body: DeskshareHangUpVoiceConfMsgBody) extends BbbCoreMsg +case class DeskshareHangUpVoiceConfMsgBody(voiceConf: String, deskshareConf: String, timestamp: String) + + +/*** Message from FS Conference to Akka Apps ***/ +case class BbbCoreVoiceConfHeader(name: String, voiceConf: String) extends BbbCoreHeader + +object RecordingStartedVoiceConfEvtMsg { val NAME = "RecordingStartedVoiceConfEvtMsg" } +case class RecordingStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: RecordingStartedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class RecordingStartedVoiceConfEvtMsgBody(voiceConf: String, stream: String, recording: Boolean, timestamp: String) + +object UserJoinedVoiceConfEvtMsg { val NAME = "UserJoinedVoiceConfEvtMsg" } +case class UserJoinedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: UserJoinedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class UserJoinedVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, userId: String, + callerIdName: String, callerIdNum: String, muted: Boolean, + talking: Boolean, avatarUrl: String) + +object UserLeftVoiceConfEvtMsg { val NAME = "UserLeftVoiceConfEvtMsg" } +case class UserLeftVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: UserLeftVoiceConfEvtMsgBody) extends BbbCoreMsg +case class UserLeftVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String) + +object UserMutedInVoiceConfEvtMsg { val NAME = "UserMutedInVoiceConfEvtMsg" } +case class UserMutedInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: UserMutedInVoiceConfEvtMsgBody) extends BbbCoreMsg +case class UserMutedInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, muted: Boolean) + +object UserTalkingInVoiceConfEvtMsg { val NAME = "UserTalkingInVoiceConfEvtMsg" } +case class UserTalkingInVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: UserTalkingInVoiceConfEvtMsgBody) extends BbbCoreMsg +case class UserTalkingInVoiceConfEvtMsgBody(voiceConf: String, voiceUserId: String, talking: Boolean) + +object DeskshareStartedVoiceConfEvtMsg { val NAME = "DeskshareStartedVoiceConfEvtMsg" } +case class DeskshareStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: DeskshareStartedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class DeskshareStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String, + callerIdNum: String, callerIdName: String) + +object DeskshareStoppedVoiceConfEvtMsg { val NAME = "DeskshareStoppedVoiceConfEvtMsg"} +case class DeskshareStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: DeskshareStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class DeskshareStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String, + callerIdNum: String, callerIdName: String) + +object DeskshareRtmpBroadcastStartedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStartedVoiceConfEvtMsg"} +case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class DeskshareRtmpBroadcastStartedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String, + stream: String, vidWidth: String, vidHeight: String, + timestamp: String) + +object DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg { val NAME = "DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg"} +case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsg(header: BbbCoreVoiceConfHeader, + body: DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody) extends BbbCoreMsg +case class DeskshareRtmpBroadcastStoppedVoiceConfEvtMsgBody(voiceConf: String, deskshareConf: String, + stream: String, vidWidth: String, vidHeight: String, + timestamp: String)