Skip to content
Snippets Groups Projects
Commit 391ce4d8 authored by Aron Engineer's avatar Aron Engineer
Browse files

feat: BasicStatusCheck added for redis db, computation added to HeathActor

parent 135f80ba
No related branches found
No related tags found
No related merge requests found
......@@ -41,8 +41,12 @@ object Boot extends App with SystemConfiguration {
val msgSender = new MessageSender(redisPublisher)
val healthzService = HealthzService(system)
val apiService = new ApiService(healthzService)
val redisRecorderActor = system.actorOf(
RedisRecorderActor.props(system, redisConfig),
RedisRecorderActor.props(system, redisConfig, healthzService),
"redisRecorderActor"
)
......@@ -60,7 +64,7 @@ object Boot extends App with SystemConfiguration {
outBus2.subscribe(analyticsActorRef, outBbbMsgMsgChannel)
bbbMsgBus.subscribe(analyticsActorRef, analyticsChannel)
val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW), "bigbluebutton-actor")
val bbbActor = system.actorOf(BigBlueButtonActor.props(system, eventBus, bbbMsgBus, outGW, healthzService), "bigbluebutton-actor")
eventBus.subscribe(bbbActor, meetingManagerChannel)
val redisMessageHandlerActor = system.actorOf(ReceivedJsonMsgHandlerActor.props(bbbMsgBus, incomingJsonMessageBus))
......@@ -80,9 +84,5 @@ object Boot extends App with SystemConfiguration {
"redis-subscriber"
)
val healthz = HealthzService(system)
val apiService = new ApiService(healthz)
val bindingFuture = Http().bindAndHandle(apiService.routes, httpHost, httpPort)
}
package org.bigbluebutton.core
import java.io.{ PrintWriter, StringWriter }
import akka.actor._
import akka.actor.ActorLogging
import akka.actor.SupervisorStrategy.Resume
......@@ -11,27 +10,30 @@ import scala.concurrent.duration._
import org.bigbluebutton.core.bus._
import org.bigbluebutton.core.api._
import org.bigbluebutton.SystemConfiguration
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit
import org.bigbluebutton.common2.msgs._
import org.bigbluebutton.core.running.RunningMeeting
import org.bigbluebutton.core2.RunningMeetings
import org.bigbluebutton.core2.message.senders.MsgBuilder
import org.bigbluebutton.service.HealthzService
object BigBlueButtonActor extends SystemConfiguration {
def props(
system: ActorSystem,
eventBus: InternalEventBus,
bbbMsgBus: BbbMsgRouterEventBus,
outGW: OutMessageGateway
system: ActorSystem,
eventBus: InternalEventBus,
bbbMsgBus: BbbMsgRouterEventBus,
outGW: OutMessageGateway,
healthzService: HealthzService
): Props =
Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW)
Props(classOf[BigBlueButtonActor], system, eventBus, bbbMsgBus, outGW, healthzService)
}
class BigBlueButtonActor(
val system: ActorSystem,
val eventBus: InternalEventBus, val bbbMsgBus: BbbMsgRouterEventBus,
val outGW: OutMessageGateway
val outGW: OutMessageGateway,
val healthzService: HealthzService
) extends Actor
with ActorLogging with SystemConfiguration {
......@@ -166,6 +168,7 @@ class BigBlueButtonActor(
private def handleCheckAlivePingSysMsg(msg: CheckAlivePingSysMsg): Unit = {
val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.timestamp)
healthzService.sendReceiveStatusMessage(System.currentTimeMillis())
outGW.send(event)
}
......
......@@ -10,22 +10,32 @@ import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorSystem
import akka.actor.Props
import org.bigbluebutton.service.HealthzService
import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global
case object CheckRecordingDBStatus
object RedisRecorderActor {
def props(
system: ActorSystem,
redisConfig: RedisConfig
system: ActorSystem,
redisConfig: RedisConfig,
healthzService: HealthzService
): Props =
Props(
classOf[RedisRecorderActor],
system,
redisConfig
redisConfig,
healthzService
)
}
class RedisRecorderActor(
system: ActorSystem,
redisConfig: RedisConfig
system: ActorSystem,
redisConfig: RedisConfig,
healthzService: HealthzService
)
extends RedisStorageProvider(
system,
......@@ -33,6 +43,8 @@ class RedisRecorderActor(
redisConfig
) with Actor with ActorLogging {
system.scheduler.schedule(1.minutes, 1.minutes, self, CheckRecordingDBStatus)
private def record(session: String, message: java.util.Map[java.lang.String, java.lang.String]): Unit = {
redis.recordAndExpire(session, message)
}
......@@ -41,7 +53,7 @@ class RedisRecorderActor(
//=============================
// 2x messages
case msg: BbbCommonEnvCoreMsg => handleBbbCommonEnvCoreMsg(msg)
case CheckRecordingDBStatus => checkRecordingDBStatus()
case _ => // do nothing
}
......@@ -529,4 +541,12 @@ class RedisRecorderActor(
record(meetingId, ev.toMap.asJava)
}
private def checkRecordingDBStatus(): Unit = {
if (redis.checkConnectionStatusBasic)
healthzService.sendRecordingDBStatusMessage(System.currentTimeMillis())
else
log.error("recording database is not available.")
}
}
......@@ -4,12 +4,17 @@ import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import akka.util.Timeout
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration._
import akka.pattern.{ AskTimeoutException, ask }
import org.bigbluebutton.core.BigBlueButtonActor
sealed trait HealthMessage
case class GetBigBlueButtonActorStatus(bbbActor: BigBlueButtonActor) extends HealthMessage
case class SetPubSubReceiveStatus(timestamp: Long) extends HealthMessage
case class SetRecordingDatabaseStatus(timestamp: Long) extends HealthMessage
case object GetHealthMessage extends HealthMessage
case class GetHealthResponseMessage(isHealthy: Boolean) extends HealthMessage
object HealthzService {
......@@ -20,10 +25,10 @@ class HealthzService(system: ActorSystem) {
implicit def executionContext = system.dispatcher
implicit val timeout: Timeout = 2 seconds
val actorRef = system.actorOf(HealthzActor.props())
val healthActor = system.actorOf(HealthzActor.props())
def getHealthz(): Future[GetHealthResponseMessage] = {
val future = actorRef.ask(GetHealthMessage).mapTo[GetHealthResponseMessage]
val future = healthActor.ask(GetHealthMessage).mapTo[GetHealthResponseMessage]
future.recover {
case e: AskTimeoutException => {
GetHealthResponseMessage(isHealthy = false)
......@@ -31,6 +36,13 @@ class HealthzService(system: ActorSystem) {
}
}
def sendReceiveStatusMessage(timestamp: Long): Unit = {
healthActor ! SetPubSubReceiveStatus(timestamp)
}
def sendRecordingDBStatusMessage(timestamp: Long): Unit = {
healthActor ! SetRecordingDatabaseStatus(timestamp)
}
}
object HealthzActor {
......@@ -38,8 +50,27 @@ object HealthzActor {
}
class HealthzActor extends Actor with ActorLogging {
val twoMins = 2 * 60 * 1000
var lastReceivedTimestamp = 0L
var recordingDBResponseTimestamp = 0L
override def receive: Receive = {
case GetHealthMessage => sender ! GetHealthResponseMessage(isHealthy = true)
case _ => println("unexpected message, exception could be raised")
case GetHealthMessage =>
val status: Boolean = (computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins) &&
(computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins)
println(s"lastReceivedTimestamp: $lastReceivedTimestamp")
println(s"recordingDBResponseTimestamp: $recordingDBResponseTimestamp")
sender ! GetHealthResponseMessage(status)
case SetPubSubReceiveStatus(timestamp) =>
lastReceivedTimestamp = timestamp
case SetRecordingDatabaseStatus(timestamp) =>
recordingDBResponseTimestamp = timestamp
case _ => println("unexpected message, exception could be raised")
}
def computeElapsedTimeFromNow(inputTimeStamp: Long): Long = {
System.currentTimeMillis() - inputTimeStamp
}
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import com.sun.org.apache.xpath.internal.operations.Bool;
import io.lettuce.core.api.sync.BaseRedisCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -146,4 +147,11 @@ public class RedisStorageService extends RedisAwareCommunicator {
result = commands.hmset(key, info);
return result;
}
public Boolean checkConnectionStatusBasic() {
BaseRedisCommands command = connection.sync();
String response = command.ping();
return response.equals("PONG");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment