diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala index f28b7066260e8eeaad2f5b9f5c9f424ecbbe3efb..47310b3e2ae1f4c597dc006a0ef1cf25157e44dd 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/Boot.scala @@ -41,8 +41,12 @@ object Boot extends App with SystemConfiguration { val msgSender = new MessageSender(redisPublisher) + val healthzService = HealthzService(system) + + val apiService = new ApiService(healthzService) + val redisRecorderActor = system.actorOf( - RedisRecorderActor.props(system, redisConfig), + RedisRecorderActor.props(system, redisConfig, healthzService), "redisRecorderActor" ) @@ -60,7 +64,7 @@ object Boot extends App with SystemConfiguration { outBus2.subscribe(analyticsActorRef, outBbbMsgMsgChannel) bbbMsgBus.subscribe(analyticsActorRef, analyticsChannel) - val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW), "bigbluebutton-actor") + val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW, healthzService), "bigbluebutton-actor") eventBus.subscribe(bbbActor, meetingManagerChannel) val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus)) @@ -80,9 +84,5 @@ object Boot extends App with SystemConfiguration { "redis-subscriber" ) - val healthz = HealthzService(system) - - val apiService = new ApiService(healthz) - val bindingFuture = Http().bindAndHandle(apiService.routes, httpHost, httpPort) } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala index 4b9cefdc539619d74b56053320195903e123be42..4dd5c9cdd3130959b56b46bef0c540e38f925e8d 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala @@ -1,7 +1,6 @@ package org.bigbluebutton.core import java.io.{ PrintWriter, StringWriter } - import akka.actor._ import akka.actor.ActorLogging import akka.actor.SupervisorStrategy.Resume @@ -11,27 +10,30 @@ import scala.concurrent.duration._ import org.bigbluebutton.core.bus._ import org.bigbluebutton.core.api._ import org.bigbluebutton.SystemConfiguration -import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit import org.bigbluebutton.common2.msgs._ import org.bigbluebutton.core.running.RunningMeeting import org.bigbluebutton.core2.RunningMeetings import org.bigbluebutton.core2.message.senders.MsgBuilder +import org.bigbluebutton.service.HealthzService object BigBlueButtonActor extends SystemConfiguration { def props( - system: ActorSystem, - eventBus: InternalEventBus, - bbbMsgBus: BbbMsgRouterEventBus, - outGW: OutMessageGateway + system: ActorSystem, + eventBus: InternalEventBus, + bbbMsgBus: BbbMsgRouterEventBus, + outGW: OutMessageGateway, + healthzService: HealthzService ): Props = - Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW) + Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW, healthzService) } class BigBlueButtonActor( val system: ActorSystem, val eventBus: InternalEventBus, val bbbMsgBus: BbbMsgRouterEventBus, - val outGW: OutMessageGateway + val outGW: OutMessageGateway, + val healthzService: HealthzService ) extends Actor with ActorLogging with SystemConfiguration { @@ -166,6 +168,7 @@ class BigBlueButtonActor( private def handleCheckAlivePingSysMsg(msg: CheckAlivePingSysMsg): Unit = { val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.timestamp) + healthzService.sendReceiveStatusMessage(System.currentTimeMillis()) outGW.send(event) } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala index 3aee1d95028a9d06a5b26b1a70d17fa0da6a3e8d..2a27a05ef2b8cc1267bdc933114cb6c086aa8647 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/endpoint/redis/RedisRecorderActor.scala @@ -10,22 +10,32 @@ import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorSystem import akka.actor.Props +import org.bigbluebutton.service.HealthzService + +import scala.concurrent.duration._ +import scala.concurrent._ +import ExecutionContext.Implicits.global + +case object CheckRecordingDBStatus object RedisRecorderActor { def props( - system: ActorSystem, - redisConfig: RedisConfig + system: ActorSystem, + redisConfig: RedisConfig, + healthzService: HealthzService ): Props = Props( classOf[RedisRecorderActor], system, - redisConfig + redisConfig, + healthzService ) } class RedisRecorderActor( - system: ActorSystem, - redisConfig: RedisConfig + system: ActorSystem, + redisConfig: RedisConfig, + healthzService: HealthzService ) extends RedisStorageProvider( system, @@ -33,6 +43,8 @@ class RedisRecorderActor( redisConfig ) with Actor with ActorLogging { + system.scheduler.schedule(1.minutes, 1.minutes, self, CheckRecordingDBStatus) + private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = { redis.recordAndExpire(session, message) } @@ -41,7 +53,7 @@ class RedisRecorderActor( //============================= // 2x messages case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg) - + case CheckRecordingDBStatus => checkRecordingDBStatus() case _ => // do nothing } @@ -529,4 +541,12 @@ class RedisRecorderActor( record(meetingId, ev.toMap.asJava) } + + private def checkRecordingDBStatus(): Unit = { + if (redis.checkConnectionStatusBasic) + healthzService.sendRecordingDBStatusMessage(System.currentTimeMillis()) + else + log.error("recording database is not available.") + } + } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala index 2e07689b4163e0cf94f56cd3bf6bcbd8f2a5e2f2..20b567d487fb922aef7627e03d2d8456a2d8ea3d 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala @@ -4,12 +4,17 @@ import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } import akka.util.Timeout import scala.concurrent.Future -import scala.concurrent.duration.DurationInt +import scala.concurrent.duration._ import akka.pattern.{ AskTimeoutException, ask } +import org.bigbluebutton.core.BigBlueButtonActor sealed trait HealthMessage +case class GetBigBlueButtonActorStatus(bbbActor: BigBlueButtonActor) extends HealthMessage +case class SetPubSubReceiveStatus(timestamp: Long) extends HealthMessage +case class SetRecordingDatabaseStatus(timestamp: Long) extends HealthMessage case object GetHealthMessage extends HealthMessage + case class GetHealthResponseMessage(isHealthy: Boolean) extends HealthMessage object HealthzService { @@ -20,10 +25,10 @@ class HealthzService(system: ActorSystem) { implicit def executionContext = system.dispatcher implicit val timeout: Timeout = 2 seconds - val actorRef = system.actorOf(HealthzActor.props()) + val healthActor = system.actorOf(HealthzActor.props()) def getHealthz(): Future[GetHealthResponseMessage] = { - val future = actorRef.ask(GetHealthMessage).mapTo[GetHealthResponseMessage] + val future = healthActor.ask(GetHealthMessage).mapTo[GetHealthResponseMessage] future.recover { case e: AskTimeoutException => { GetHealthResponseMessage(isHealthy = false) @@ -31,6 +36,13 @@ class HealthzService(system: ActorSystem) { } } + def sendReceiveStatusMessage(timestamp: Long): Unit = { + healthActor ! SetPubSubReceiveStatus(timestamp) + } + + def sendRecordingDBStatusMessage(timestamp: Long): Unit = { + healthActor ! SetRecordingDatabaseStatus(timestamp) + } } object HealthzActor { @@ -38,8 +50,27 @@ object HealthzActor { } class HealthzActor extends Actor with ActorLogging { + val twoMins = 2 * 60 * 1000 + var lastReceivedTimestamp = 0L + var recordingDBResponseTimestamp = 0L + override def receive: Receive = { - case GetHealthMessage => sender ! GetHealthResponseMessage(isHealthy = true) - case _ => println("unexpected message, exception could be raised") + case GetHealthMessage => + val status: Boolean = (computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins) && + (computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins) + println(s"lastReceivedTimestamp: $lastReceivedTimestamp") + println(s"recordingDBResponseTimestamp: $recordingDBResponseTimestamp") + + sender ! GetHealthResponseMessage(status) + + case SetPubSubReceiveStatus(timestamp) => + lastReceivedTimestamp = timestamp + case SetRecordingDatabaseStatus(timestamp) => + recordingDBResponseTimestamp = timestamp + case _ => println("unexpected message, exception could be raised") + } + + def computeElapsedTimeFromNow(inputTimeStamp: Long): Long = { + System.currentTimeMillis() - inputTimeStamp } } \ No newline at end of file diff --git a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java index 00506598398feaf6b6fa7892f4b0cbc4d08649e3..7491996c46368805fa9739db5c7461005910819a 100755 --- a/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java +++ b/bbb-common-message/src/main/java/org/bigbluebutton/common2/redis/RedisStorageService.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import com.sun.org.apache.xpath.internal.operations.Bool; +import io.lettuce.core.api.sync.BaseRedisCommands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,4 +147,11 @@ public class RedisStorageService extends RedisAwareCommunicator { result = commands.hmset(key, info); return result; } + + public Boolean checkConnectionStatusBasic() { + BaseRedisCommands command = connection.sync(); + String response = command.ping(); + + return response.equals("PONG"); + } }