From cae124aed7c39c3e48fd610019096ae81528e85d Mon Sep 17 00:00:00 2001 From: Aron Engineer <aron.engineer1@gmail.com> Date: Thu, 4 Mar 2021 20:29:37 +0000 Subject: [PATCH] feat: pubsub send and receive statuses added to health endpoint --- .../scala/org/bigbluebutton/ApiService.scala | 28 ++++++++-- .../core/BigBlueButtonActor.scala | 3 +- .../core2/message/senders/MsgBuilder.scala | 4 +- .../service/HealthzService.scala | 56 ++++++++++++++----- .../common2/msgs/SystemMsgs.scala | 4 +- .../messaging/messages/KeepAliveReply.java | 10 ++-- .../api/pub/IPublisherService.java | 2 +- .../bigbluebutton/api2/IBbbWebApiGWApp.java | 2 +- .../web/services/KeepAlivePong.java | 10 ++-- .../web/services/KeepAliveService.java | 10 ++-- .../bigbluebutton/api2/BbbWebApiGWApp.scala | 4 +- .../org/bigbluebutton/api2/MsgBuilder.scala | 4 +- .../api2/meeting/OldMeetingMsgHdlrActor.scala | 3 +- 13 files changed, 98 insertions(+), 42 deletions(-) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/ApiService.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/ApiService.scala index df07205d50..c1d3dbad6e 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/ApiService.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/ApiService.scala @@ -3,13 +3,21 @@ package org.bigbluebutton import akka.http.scaladsl.model._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport import akka.http.scaladsl.server.Directives._ -import org.bigbluebutton.service.HealthzService +import org.bigbluebutton.service.{ HealthzService, PubSubReceiveStatus, PubSubSendStatus, RecordingDBSendStatus } import spray.json.DefaultJsonProtocol -case class HealthResponse(isHealthy: Boolean) +case class HealthResponse( + isHealthy: Boolean, + pubsubSendStatus: PubSubSendStatus, + pubsubReceiveStatus: PubSubReceiveStatus, + recordingDbStatus: RecordingDBSendStatus +) trait JsonSupportProtocol extends SprayJsonSupport with DefaultJsonProtocol { - implicit val healthServiceJsonFormat = jsonFormat1(HealthResponse) + implicit val pubSubSendStatusJsonFormat = jsonFormat2(PubSubSendStatus) + implicit val pubSubReceiveStatusJsonFormat = jsonFormat2(PubSubReceiveStatus) + implicit val recordingDbStatusJsonFormat = jsonFormat2(RecordingDBSendStatus) + implicit val healthServiceJsonFormat = jsonFormat4(HealthResponse) } class ApiService(healthz: HealthzService) extends JsonSupportProtocol { @@ -21,9 +29,19 @@ class ApiService(healthz: HealthzService) extends JsonSupportProtocol { onSuccess(future) { case response => if (response.isHealthy) { - complete(StatusCodes.OK, HealthResponse(response.isHealthy)) + complete(StatusCodes.OK, HealthResponse( + response.isHealthy, + response.pubSubSendStatus, + response.pubSubReceiveStatus, + response.recordingDBSendStatus + )) } else { - complete(StatusCodes.ServiceUnavailable, HealthResponse(response.isHealthy)) + complete(StatusCodes.ServiceUnavailable, HealthResponse( + response.isHealthy, + response.pubSubSendStatus, + response.pubSubReceiveStatus, + response.recordingDBSendStatus + )) } } } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala index 4dd5c9cdd3..156be4b27f 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core/BigBlueButtonActor.scala @@ -167,7 +167,8 @@ class BigBlueButtonActor( } private def handleCheckAlivePingSysMsg(msg: CheckAlivePingSysMsg): Unit = { - val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.timestamp) + val event = MsgBuilder.buildCheckAlivePingSysMsg(msg.body.system, msg.body.bbbWebTimestamp, System.currentTimeMillis()) + healthzService.sendSentStatusMessage(msg.body.akkaAppsTimestamp) healthzService.sendReceiveStatusMessage(System.currentTimeMillis()) outGW.send(event) } diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala index e02174ffae..58f591623a 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/core2/message/senders/MsgBuilder.scala @@ -238,10 +238,10 @@ object MsgBuilder { BbbCommonEnvCoreMsg(envelope, event) } - def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = { + def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = { val routing = collection.immutable.HashMap("sender" -> "bbb-apps-akka") val envelope = BbbCoreEnvelope(CheckAlivePongSysMsg.NAME, routing) - val body = CheckAlivePongSysMsgBody(system, timestamp) + val body = CheckAlivePongSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp) val header = BbbCoreBaseHeader(CheckAlivePongSysMsg.NAME) val event = CheckAlivePongSysMsg(header, body) diff --git a/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala b/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala index 20b567d487..7b8e53a8a3 100755 --- a/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala +++ b/akka-bbb-apps/src/main/scala/org/bigbluebutton/service/HealthzService.scala @@ -8,14 +8,27 @@ import scala.concurrent.duration._ import akka.pattern.{ AskTimeoutException, ask } import org.bigbluebutton.core.BigBlueButtonActor +import java.time.{ Instant, LocalDateTime } +import java.util.TimeZone + sealed trait HealthMessage +case object GetHealthMessage extends HealthMessage case class GetBigBlueButtonActorStatus(bbbActor: BigBlueButtonActor) extends HealthMessage case class SetPubSubReceiveStatus(timestamp: Long) extends HealthMessage +case class SetPubSubSentStatus(timestamp: Long) extends HealthMessage case class SetRecordingDatabaseStatus(timestamp: Long) extends HealthMessage -case object GetHealthMessage extends HealthMessage -case class GetHealthResponseMessage(isHealthy: Boolean) extends HealthMessage +case class GetHealthResponseMessage( + isHealthy: Boolean, + pubSubSendStatus: PubSubSendStatus, + pubSubReceiveStatus: PubSubReceiveStatus, + recordingDBSendStatus: RecordingDBSendStatus +) extends HealthMessage + +case class PubSubSendStatus(status: Boolean, timestamp: String) +case class PubSubReceiveStatus(status: Boolean, timestamp: String) +case class RecordingDBSendStatus(status: Boolean, timestamp: String) object HealthzService { def apply(system: ActorSystem) = new HealthzService(system) @@ -31,11 +44,15 @@ class HealthzService(system: ActorSystem) { val future = healthActor.ask(GetHealthMessage).mapTo[GetHealthResponseMessage] future.recover { case e: AskTimeoutException => { - GetHealthResponseMessage(isHealthy = false) + GetHealthResponseMessage(false, null, null, null) } } } + def sendSentStatusMessage(timestamp: Long): Unit = { + healthActor ! SetPubSubSentStatus(timestamp) + } + def sendReceiveStatusMessage(timestamp: Long): Unit = { healthActor ! SetPubSubReceiveStatus(timestamp) } @@ -43,6 +60,7 @@ class HealthzService(system: ActorSystem) { def sendRecordingDBStatusMessage(timestamp: Long): Unit = { healthActor ! SetRecordingDatabaseStatus(timestamp) } + } object HealthzActor { @@ -50,19 +68,26 @@ object HealthzActor { } class HealthzActor extends Actor with ActorLogging { - val twoMins = 2 * 60 * 1000 - var lastReceivedTimestamp = 0L - var recordingDBResponseTimestamp = 0L + val twoMins: Int = 2 * 60 * 1000 + var lastSentTimestamp: Long = 0L + var lastReceivedTimestamp: Long = 0L + var recordingDBResponseTimestamp: Long = 0L override def receive: Receive = { case GetHealthMessage => - val status: Boolean = (computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins) && - (computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins) - println(s"lastReceivedTimestamp: $lastReceivedTimestamp") - println(s"recordingDBResponseTimestamp: $recordingDBResponseTimestamp") - - sender ! GetHealthResponseMessage(status) - + val c1: Boolean = computeElapsedTimeFromNow(lastSentTimestamp) < 30000 + val c2: Boolean = computeElapsedTimeFromNow(lastReceivedTimestamp) < twoMins + val c3: Boolean = computeElapsedTimeFromNow(recordingDBResponseTimestamp) < twoMins + val status: Boolean = c1 && c2 && c3 + + sender ! GetHealthResponseMessage( + status, + PubSubSendStatus(c1, convertLongTimestampToDateTimeString(lastSentTimestamp)), + PubSubReceiveStatus(c2, convertLongTimestampToDateTimeString(lastReceivedTimestamp)), + RecordingDBSendStatus(c3, convertLongTimestampToDateTimeString(recordingDBResponseTimestamp)) + ) + case SetPubSubSentStatus(timestamp) => + lastSentTimestamp = timestamp case SetPubSubReceiveStatus(timestamp) => lastReceivedTimestamp = timestamp case SetRecordingDatabaseStatus(timestamp) => @@ -73,4 +98,9 @@ class HealthzActor extends Actor with ActorLogging { def computeElapsedTimeFromNow(inputTimeStamp: Long): Long = { System.currentTimeMillis() - inputTimeStamp } + + def convertLongTimestampToDateTimeString(inputTimestamp: Long): String = { + LocalDateTime.ofInstant(Instant.ofEpochMilli(inputTimestamp), TimeZone.getDefault.toZoneId).toString + } + } \ No newline at end of file diff --git a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala index 9ffe660e4e..75f0b1bb5a 100755 --- a/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala +++ b/bbb-common-message/src/main/scala/org/bigbluebutton/common2/msgs/SystemMsgs.scala @@ -164,14 +164,14 @@ case class CheckAlivePingSysMsg( header: BbbCoreBaseHeader, body: CheckAlivePingSysMsgBody ) extends BbbCoreMsg -case class CheckAlivePingSysMsgBody(system: String, timestamp: Long) +case class CheckAlivePingSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long) object CheckAlivePongSysMsg { val NAME = "CheckAlivePongSysMsg" } case class CheckAlivePongSysMsg( header: BbbCoreBaseHeader, body: CheckAlivePongSysMsgBody ) extends BbbCoreMsg -case class CheckAlivePongSysMsgBody(system: String, timestamp: Long) +case class CheckAlivePongSysMsgBody(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long) object RecordingChapterBreakSysMsg { val NAME = "RecordingChapterBreakSysMsg" } case class RecordingChapterBreakSysMsg( diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java index d999e45d68..6256c96816 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api/messaging/messages/KeepAliveReply.java @@ -3,10 +3,12 @@ package org.bigbluebutton.api.messaging.messages; public class KeepAliveReply implements IMessage { public final String system; - public final Long timestamp; - - public KeepAliveReply(String system, Long timestamp) { + public final Long bbbWebTimestamp; + public final Long akkaAppsTimestamp; + + public KeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) { this.system = system; - this.timestamp = timestamp; + this.bbbWebTimestamp = bbbWebTimestamp; + this.akkaAppsTimestamp = akkaAppsTimestamp; } } diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api/pub/IPublisherService.java b/bbb-common-web/src/main/java/org/bigbluebutton/api/pub/IPublisherService.java index 7b5ff0c5b5..28261c318e 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api/pub/IPublisherService.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api/pub/IPublisherService.java @@ -19,6 +19,6 @@ public interface IPublisherService { void send(String channel, String message); void registerUser(String meetingID, String internalUserId, String fullname, String role, String externUserID, String authToken, String avatarURL, Boolean guest, Boolean authed); - void sendKeepAlive(String system, Long timestamp); + void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp); void sendStunTurnInfo(String meetingId, String internalUserId, Set<StunServer> stuns, Set<TurnEntry> turns); } diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java b/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java index 4f9576646d..3699525f05 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/api2/IBbbWebApiGWApp.java @@ -42,7 +42,7 @@ public interface IBbbWebApiGWApp { void destroyMeeting(DestroyMeetingMessage msg); void endMeeting(EndMeetingMessage msg); - void sendKeepAlive(String system, Long timestamp); + void sendKeepAlive(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp); void publishedRecording(PublishedRecordingMessage msg); void unpublishedRecording(UnpublishedRecordingMessage msg); void deletedRecording(DeletedRecordingMessage msg); diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAlivePong.java b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAlivePong.java index 2f928bfa71..2aca75eb14 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAlivePong.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAlivePong.java @@ -3,10 +3,12 @@ package org.bigbluebutton.web.services; public class KeepAlivePong implements KeepAliveMessage { public final String system; - public final Long timestamp; - - public KeepAlivePong(String system, Long timestamp) { + public final Long bbbWebTimestamp; + public final Long akkaAppsTimestamp; + + public KeepAlivePong(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) { this.system = system; - this.timestamp = timestamp; + this.bbbWebTimestamp = bbbWebTimestamp; + this.akkaAppsTimestamp = akkaAppsTimestamp; } } diff --git a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java index 3c73e65966..ada7b02742 100755 --- a/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java +++ b/bbb-common-web/src/main/java/org/bigbluebutton/web/services/KeepAliveService.java @@ -51,6 +51,7 @@ public class KeepAliveService implements MessageListener { private BlockingQueue<KeepAliveMessage> messages = new LinkedBlockingQueue<KeepAliveMessage>(); private Long lastKeepAliveMessage = 0L; + private Long lastAkkaAppsTimestamp = 0L; private static final String SYSTEM = "BbbWeb"; @@ -123,7 +124,7 @@ public class KeepAliveService implements MessageListener { } private void processPing(KeepAlivePing msg) { - gw.sendKeepAlive(SYSTEM, System.currentTimeMillis()); + gw.sendKeepAlive(SYSTEM, System.currentTimeMillis(), lastAkkaAppsTimestamp); Boolean akkaAppsIsAvailable = available; if (lastKeepAliveMessage != 0 && (System.currentTimeMillis() - lastKeepAliveMessage > 30000)) { @@ -141,12 +142,13 @@ public class KeepAliveService implements MessageListener { } lastKeepAliveMessage = System.currentTimeMillis(); + lastAkkaAppsTimestamp = msg.akkaAppsTimestamp; available = true; } - private void handleKeepAliveReply(String system, Long timestamp) { + private void handleKeepAliveReply(String system, Long bbbWebTimestamp, Long akkaAppsTimestamp) { if (SYSTEM.equals(system)) { - KeepAlivePong pong = new KeepAlivePong(system, timestamp); + KeepAlivePong pong = new KeepAlivePong(system, bbbWebTimestamp, akkaAppsTimestamp); queueMessage(pong); } } @@ -155,7 +157,7 @@ public class KeepAliveService implements MessageListener { public void handle(IMessage message) { if (message instanceof KeepAliveReply) { KeepAliveReply msg = (KeepAliveReply) message; - handleKeepAliveReply(msg.system, msg.timestamp); + handleKeepAliveReply(msg.system, msg.bbbWebTimestamp, msg.akkaAppsTimestamp); } } } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala index ca536f6ac2..7c59f9e3ba 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/BbbWebApiGWApp.scala @@ -255,8 +255,8 @@ class BbbWebApiGWApp( msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event)) } - def sendKeepAlive(system: String, timestamp: java.lang.Long): Unit = { - val event = MsgBuilder.buildCheckAlivePingSysMsg(system, timestamp.longValue()) + def sendKeepAlive(system: String, bbbWebTimestamp: java.lang.Long, akkaAppsTimestamp: java.lang.Long): Unit = { + val event = MsgBuilder.buildCheckAlivePingSysMsg(system, bbbWebTimestamp.longValue(), akkaAppsTimestamp.longValue()) msgToAkkaAppsEventBus.publish(MsgToAkkaApps(toAkkaAppsChannel, event)) } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/MsgBuilder.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/MsgBuilder.scala index 0429f9f112..945cce0b8a 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/MsgBuilder.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/MsgBuilder.scala @@ -64,11 +64,11 @@ object MsgBuilder { BbbCommonEnvCoreMsg(envelope, req) } - def buildCheckAlivePingSysMsg(system: String, timestamp: Long): BbbCommonEnvCoreMsg = { + def buildCheckAlivePingSysMsg(system: String, bbbWebTimestamp: Long, akkaAppsTimestamp: Long): BbbCommonEnvCoreMsg = { val routing = collection.immutable.HashMap("sender" -> "bbb-web") val envelope = BbbCoreEnvelope(CheckAlivePingSysMsg.NAME, routing) val header = BbbCoreBaseHeader(CheckAlivePingSysMsg.NAME) - val body = CheckAlivePingSysMsgBody(system, timestamp) + val body = CheckAlivePingSysMsgBody(system, bbbWebTimestamp, akkaAppsTimestamp) val req = CheckAlivePingSysMsg(header, body) BbbCommonEnvCoreMsg(envelope, req) } diff --git a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/OldMeetingMsgHdlrActor.scala b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/OldMeetingMsgHdlrActor.scala index 8c70b881e6..1eb8d18fe6 100755 --- a/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/OldMeetingMsgHdlrActor.scala +++ b/bbb-common-web/src/main/scala/org/bigbluebutton/api2/meeting/OldMeetingMsgHdlrActor.scala @@ -91,7 +91,8 @@ class OldMeetingMsgHdlrActor(val olgMsgGW: OldMessageReceivedGW) } def handleCheckAlivePongSysMsg(msg: CheckAlivePongSysMsg): Unit = { - olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.timestamp)) + olgMsgGW.handle(new org.bigbluebutton.api.messaging.messages.KeepAliveReply(msg.body.system, msg.body.bbbWebTimestamp, + msg.body.akkaAppsTimestamp)) } def handleUserJoinedMeetingEvtMsg(msg: UserJoinedMeetingEvtMsg): Unit = { -- GitLab