diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgDeserializer.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgDeserializer.scala index 970a2cd3c84ddc6fa891acc3d5ad2ae5ae1d2d47..9016b7b324a608a02632ae5e5bb73d83196ae445 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgDeserializer.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgDeserializer.scala @@ -3,287 +3,22 @@ package org.bigbluebutton.core.pubsub.senders import com.fasterxml.jackson.databind.JsonNode import org.bigbluebutton.SystemConfiguration import org.bigbluebutton.common2.messages._ -import org.bigbluebutton.common2.messages.voiceconf._ -import org.bigbluebutton.core.bus.BbbMsgEvent trait ReceivedJsonMsgDeserializer extends SystemConfiguration { this: ReceivedJsonMsgHandlerActor => object JsonDeserializer extends Deserializer - def routeCreateMeetingReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[CreateMeetingReqMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[CreateMeetingReqMsg](jsonNode) - result match { - case Some(msg) => Some(msg.asInstanceOf[CreateMeetingReqMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: CreateMeetingReqMsg): Unit = { - val event = BbbMsgEvent(meetingManagerChannel, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeValidateAuthTokenReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[ValidateAuthTokenReqMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[ValidateAuthTokenReqMsg](jsonNode) - - result match { - case Some(msg) => Some(msg.asInstanceOf[ValidateAuthTokenReqMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: ValidateAuthTokenReqMsg): Unit = { - val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeRegisterUserReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[RegisterUserReqMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[RegisterUserReqMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[RegisterUserReqMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: RegisterUserReqMsg): Unit = { - // Route via meeting manager as there is a race condition if we send directly to meeting - // because the meeting actor might not have been created yet. - val event = BbbMsgEvent(meetingManagerChannel, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserJoinMeetingReqMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserJoinMeetingReqMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserJoinMeetingReqMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserJoinMeetingReqMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserJoinMeetingReqMsg): Unit = { - // Route via meeting manager as there is a race condition if we send directly to meeting - // because the meeting actor might not have been created yet. - val event = BbbMsgEvent(msg.header.userId, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserBroadcastCamStartMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserBroadcastCamStartMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserBroadcastCamStartMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserBroadcastCamStartMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserBroadcastCamStartMsg): Unit = { - val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserBroadcastCamStopMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserBroadcastCamStopMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserBroadcastCamStopMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserBroadcastCamStopMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } + def deserialize[T](jsonNode: JsonNode)(implicit m: Manifest[T]): Option[T] = { + val (result, error) = JsonDeserializer.toBbbCommonMsg[T](jsonNode) - def send(envelope: BbbCoreEnvelope, msg: UserBroadcastCamStopMsg): Unit = { - val event = BbbMsgEvent(msg.header.meetingId, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) + result match { + case Some(msg) => + Some(msg.asInstanceOf[T]) + case None => + log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) + None } } - def routeRecordingStartedVoiceConfEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[RecordingStartedVoiceConfEvtMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[RecordingStartedVoiceConfEvtMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[RecordingStartedVoiceConfEvtMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: RecordingStartedVoiceConfEvtMsg): Unit = { - val event = BbbMsgEvent(msg.header.voiceConf, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserJoinedVoiceConfEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserJoinedVoiceConfEvtMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserJoinedVoiceConfEvtMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserJoinedVoiceConfEvtMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserJoinedVoiceConfEvtMsg): Unit = { - val event = BbbMsgEvent(msg.header.voiceConf, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserLeftVoiceConfEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserLeftVoiceConfEvtMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserLeftVoiceConfEvtMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserLeftVoiceConfEvtMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserLeftVoiceConfEvtMsg): Unit = { - val event = BbbMsgEvent(msg.header.voiceConf, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserMutedInVoiceConfEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserMutedInVoiceConfEvtMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserMutedInVoiceConfEvtMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserMutedInVoiceConfEvtMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserMutedInVoiceConfEvtMsg): Unit = { - val event = BbbMsgEvent(msg.header.voiceConf, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } - - def routeUserTalkingInVoiceConfEvtMsg(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { - def deserialize(jsonNode: JsonNode): Option[UserTalkingInVoiceConfEvtMsg] = { - val (result, error) = JsonDeserializer.toBbbCommonMsg[UserTalkingInVoiceConfEvtMsg](jsonNode) - - result match { - case Some(msg) => - Some(msg.asInstanceOf[UserTalkingInVoiceConfEvtMsg]) - case None => - log.error("Failed to deserialize message. error: {} \n msg: ", error, jsonNode) - None - } - } - - def send(envelope: BbbCoreEnvelope, msg: UserTalkingInVoiceConfEvtMsg): Unit = { - val event = BbbMsgEvent(msg.header.voiceConf, BbbCommonEnvCoreMsg(envelope, msg)) - publish(event) - } - - for { - m <- deserialize(jsonNode) - } yield { - send(envelope, m) - } - } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala index bcc266f07626a483ddebb2a19d433bc521d0c075..0937259c281884deb814a26bd2cf863fceef6d7f 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/pubsub/senders/ReceivedJsonMsgHandlerActor.scala @@ -34,31 +34,83 @@ class ReceivedJsonMsgHandlerActor( } yield handle(envJsonNode.envelope, envJsonNode.core) } + def send(channel: String, envelope: BbbCoreEnvelope, msg: BbbCoreMsg): Unit = { + val event = BbbMsgEvent(channel, BbbCommonEnvCoreMsg(envelope, msg)) + publish(event) + } + def handle(envelope: BbbCoreEnvelope, jsonNode: JsonNode): Unit = { log.debug("Route envelope name " + envelope.name) envelope.name match { case CreateMeetingReqMsg.NAME => - routeCreateMeetingReqMsg(envelope, jsonNode) + for { + m <- deserialize[CreateMeetingReqMsg](jsonNode) + } yield { + send(meetingManagerChannel, envelope, m) + } case ValidateAuthTokenReqMsg.NAME => - routeValidateAuthTokenReqMsg(envelope, jsonNode) + for { + m <- deserialize[ValidateAuthTokenReqMsg](jsonNode) + } yield { + send(m.header.meetingId, envelope, m) + } case RegisterUserReqMsg.NAME => - routeRegisterUserReqMsg(envelope, jsonNode) + for { + m <- deserialize[RegisterUserReqMsg](jsonNode) + } yield { + // Route via meeting manager as there is a race condition if we send directly to meeting + // because the meeting actor might not have been created yet. + send(meetingManagerChannel, envelope, m) + } case UserJoinMeetingReqMsg.NAME => - routeUserJoinMeetingReqMsg(envelope, jsonNode) + for { + m <- deserialize[UserJoinMeetingReqMsg](jsonNode) + } yield { + send(m.header.userId, envelope, m) + } case UserBroadcastCamStartMsg.NAME => - routeUserBroadcastCamStartMsg(envelope, jsonNode) + for { + m <- deserialize[UserBroadcastCamStartMsg](jsonNode) + } yield { + val event = BbbMsgEvent(m.header.meetingId, BbbCommonEnvCoreMsg(envelope, m)) + publish(event) + } case UserBroadcastCamStopMsg.NAME => - routeUserBroadcastCamStopMsg(envelope, jsonNode) + for { + m <- deserialize[UserBroadcastCamStopMsg](jsonNode) + } yield { + send(m.header.meetingId, envelope, m) + } case RecordingStartedVoiceConfEvtMsg.NAME => - routeRecordingStartedVoiceConfEvtMsg(envelope, jsonNode) + for { + m <- deserialize[RecordingStartedVoiceConfEvtMsg](jsonNode) + } yield { + send(m.header.voiceConf, envelope, m) + } case UserJoinedVoiceConfEvtMsg.NAME => - routeUserJoinedVoiceConfEvtMsg(envelope, jsonNode) + for { + m <- deserialize[UserJoinedVoiceConfEvtMsg](jsonNode) + } yield { + send(m.header.voiceConf, envelope, m) + } case UserLeftVoiceConfEvtMsg.NAME => - routeUserLeftVoiceConfEvtMsg(envelope, jsonNode) + for { + m <- deserialize[UserLeftVoiceConfEvtMsg](jsonNode) + } yield { + send(m.header.voiceConf, envelope, m) + } case UserMutedInVoiceConfEvtMsg.NAME => - routeUserMutedInVoiceConfEvtMsg(envelope, jsonNode) + for { + m <- deserialize[UserMutedInVoiceConfEvtMsg](jsonNode) + } yield { + send(m.header.voiceConf, envelope, m) + } case UserTalkingInVoiceConfEvtMsg.NAME => - routeUserTalkingInVoiceConfEvtMsg(envelope, jsonNode) + for { + m <- deserialize[UserTalkingInVoiceConfEvtMsg](jsonNode) + } yield { + send(m.header.voiceConf, envelope, m) + } case _ => log.error("Cannot route envelope name " + envelope.name) // do nothing diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingActor.scala index e3b96d93a4420f57b1e68aaef0d9ccc76b2795ff..0260fcdd5e39f5bd663c997b279e687cb9a1f775 100755 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingActor.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingActor.scala @@ -1,6 +1,7 @@ package org.bigbluebutton.client.meeting import akka.actor.{Actor, ActorLogging, Props} +import org.bigbluebutton.client.SystemConfiguration import org.bigbluebutton.client.bus._ import org.bigbluebutton.common2.messages.{BbbCommonEnvJsNodeMsg, MessageTypes} @@ -11,7 +12,9 @@ object MeetingActor { } class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus, - msgToClientEventBus: MsgToClientEventBus) extends Actor with ActorLogging { + msgToClientEventBus: MsgToClientEventBus) + extends Actor with ActorLogging + with SystemConfiguration{ private val userMgr = new UsersManager @@ -69,7 +72,7 @@ class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEv log.debug("**** MeetingActor handleServerMsg " + msg.envelope.name) msgType match { case MessageTypes.DIRECT => handleDirectMessage(msg) - case MessageTypes.BROADCAST => handleBroadcastMessage(msg) + case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg) case MessageTypes.SYSTEM => handleSystemMessage(msg) } } @@ -93,7 +96,7 @@ class MeetingActor(val meetingId: String, msgToAkkaAppsEventBus: MsgToAkkaAppsEv def handleBroadcastMessage(msg: BbbCommonEnvJsNodeMsg): Unit = { // In case we want to handle specific messages. We can do it here. - forwardToUser(msg) + msgToClientEventBus.publish(MsgToClientBusMsg(toClientChannel, BroadcastMsgToMeeting(meetingId, msg))) } def handleSystemMessage(msg: BbbCommonEnvJsNodeMsg): Unit = { diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingManagerActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingManagerActor.scala index 42c663c68cd3d978d2eddca04ef347ab708fa0fd..3fa8f5b944fc42a790bf830c943b62c7e2ddcfea 100755 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingManagerActor.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/MeetingManagerActor.scala @@ -71,7 +71,7 @@ class MeetingManagerActor(msgToAkkaAppsEventBus: MsgToAkkaAppsEventBus, log.debug("**** MeetingManagerActor handleServerMsg " + msg.envelope.name) msgType match { case MessageTypes.DIRECT => handleDirectMessage(msg) - case MessageTypes.BROADCAST => handleBroadcastMessage(msg) + case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg) case MessageTypes.SYSTEM => handleSystemMessage(msg) } } diff --git a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/UserActor.scala b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/UserActor.scala index 8e649282fd60d4c2f48deae818e711df13b55ca5..6eca7821f131ab52bf14a605c4f50c76b9f14949 100755 --- a/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/UserActor.scala +++ b/bbb-apps-common/src/main/scala/org/bigbluebutton/client/meeting/UserActor.scala @@ -108,7 +108,7 @@ class UserActor(val userId: String, log.debug("**** UserActor handleServerMsg " + msg) msgType match { case MessageTypes.DIRECT => handleDirectMessage(msg) - case MessageTypes.BROADCAST => handleBroadcastMessage(msg) + case MessageTypes.BROADCAST_TO_MEETING => handleBroadcastMessage(msg) case MessageTypes.SYSTEM => handleSystemMessage(msg) } } diff --git a/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/client/messaging/ConnectionInvokerService.java b/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/client/messaging/ConnectionInvokerService.java index 14328116877df3d283d1807afb8322a290047610..174a5d605ca8ec7140732c1b50440ca13e5a8c4f 100755 --- a/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/client/messaging/ConnectionInvokerService.java +++ b/bigbluebutton-apps/src/main/java/org/bigbluebutton/red5/client/messaging/ConnectionInvokerService.java @@ -119,6 +119,8 @@ public class ConnectionInvokerService implements IConnectionInvokerService { handleDisconnectAllMessage((DisconnectAllMessage) message); } else if (message instanceof DirectToClientMsg) { handleDirectToClientMsg((DirectToClientMsg) message); + } else if (message instanceof BroadcastToMeetingMsg) { + handleBroadcastToMeetingMsg((BroadcastToMeetingMsg) message); } } @@ -178,6 +180,51 @@ public class ConnectionInvokerService implements IConnectionInvokerService { } } + private void handleBroadcastToMeetingMsg(final BroadcastToMeetingMsg msg) { + if (log.isTraceEnabled()) { + log.trace("Handle broadcast message: " + msg.messageName + " msg=" + msg.json); + } + + Runnable sender = new Runnable() { + public void run() { + IScope meetingScope = getScope(msg.meetingId); + if (meetingScope != null) { + List<Object> params = new ArrayList<Object>(); + params.add(msg.messageName); + params.add(msg.json); + + if (log.isTraceEnabled()) { + log.trace("Broadcast message: " + msg.messageName + " msg=" + msg.json); + } + + ServiceUtils.invokeOnAllScopeConnections(meetingScope, "onMessageFromServer2x", params.toArray(), null); + } + } + }; + + /** + * We need to add a way to cancel sending when the thread is blocked. + * Red5 uses a semaphore to guard the rtmp connection and we've seen + * instances where our thread is blocked preventing us from sending messages + * to other connections. (ralam nov 19, 2015) + */ + long endNanos = System.nanoTime() + SEND_TIMEOUT; + Future<?> f = runExec.submit(sender); + try { + // Only wait for the remaining time budget + long timeLeft = endNanos - System.nanoTime(); + f.get(timeLeft, TimeUnit.NANOSECONDS); + } catch (ExecutionException e) { + log.warn("ExecutionException while sending broadcast message: " + msg.messageName + " msg=" + msg.json); + } catch (InterruptedException e) { + log.warn("Interrupted exception while sending broadcast message: " + msg.messageName + " msg=" + msg.json); + Thread.currentThread().interrupt(); + } catch (TimeoutException e) { + log.warn("Timeout exception while sending broadcast message: " + msg.messageName + " msg=" + msg.json); + f.cancel(true); + } + } + private void handleDisconnectAllMessage(DisconnectAllMessage msg) { IScope meetingScope = bbbAppScope.getContext().resolveScope("bigbluebutton");