diff --git a/bbb-webhooks/.gitignore b/bbb-webhooks/.gitignore index a058abd33c561cca7442c4b4b234a4d8c2a12552..7a49c1e1ace999bc53bbafaa40f4d95e5bddc69d 100644 --- a/bbb-webhooks/.gitignore +++ b/bbb-webhooks/.gitignore @@ -3,4 +3,5 @@ *.log node_modules/ config_local.coffee +config_local.js log/* diff --git a/bbb-webhooks/app.js b/bbb-webhooks/app.js index e21c144726a3ec97f2bec194128054190ea7c030..0a47b276f1499a1f7d61c6fa9042e868738bd772 100755 --- a/bbb-webhooks/app.js +++ b/bbb-webhooks/app.js @@ -1,7 +1,5 @@ // This is a simple wrapper to run the app with 'node app.js' -require("coffee-script/register"); - -Application = require('./application.coffee'); +Application = require('./application.js'); application = new Application(); application.start(); diff --git a/bbb-webhooks/application.coffee b/bbb-webhooks/application.coffee deleted file mode 100644 index 600c3367fb9ae3f8d165ab68e729161ef0be2c0c..0000000000000000000000000000000000000000 --- a/bbb-webhooks/application.coffee +++ /dev/null @@ -1,21 +0,0 @@ -config = require("./config") -Hook = require("./hook") -IDMapping = require("./id_mapping") -WebHooks = require("./web_hooks") -WebServer = require("./web_server") - -# Class that defines the application. Listens for events on redis and starts the -# process to perform the callback calls. -# TODO: add port (-p) and log level (-l) to the command line args -module.exports = class Application - - constructor: -> - @webHooks = new WebHooks() - @webServer = new WebServer() - - start: -> - Hook.initialize => - IDMapping.initialize => - @webServer.start(config.server.port) - @webServer.createPermanent() - @webHooks.start() diff --git a/bbb-webhooks/application.js b/bbb-webhooks/application.js new file mode 100644 index 0000000000000000000000000000000000000000..d805395330bc1fcf46b86ade3852dacacf0f863c --- /dev/null +++ b/bbb-webhooks/application.js @@ -0,0 +1,28 @@ +"use strict"; +let Application; +const config = require("./config.js"); +const Hook = require("./hook.js"); +const IDMapping = require("./id_mapping.js"); +const WebHooks = require("./web_hooks.js"); +const WebServer = require("./web_server.js"); + +// Class that defines the application. Listens for events on redis and starts the +// process to perform the callback calls. +// TODO: add port (-p) and log level (-l) to the command line args +module.exports = (Application = class Application { + + constructor() { + this.webHooks = new WebHooks(); + this.webServer = new WebServer(); + } + + start() { + Hook.initialize(() => { + IDMapping.initialize(() => { + this.webServer.start(config.server.port); + this.webServer.createPermanent(); + this.webHooks.start(); + }); + }); + } +}); diff --git a/bbb-webhooks/callback_emitter.coffee b/bbb-webhooks/callback_emitter.coffee deleted file mode 100644 index be086e1277f8b248fc63c738156ef8e1769800a9..0000000000000000000000000000000000000000 --- a/bbb-webhooks/callback_emitter.coffee +++ /dev/null @@ -1,90 +0,0 @@ -_ = require('lodash') -request = require("request") -url = require('url') -EventEmitter = require('events').EventEmitter - -config = require("./config") -Logger = require("./logger") -Utils = require("./utils") - -# Use to perform a callback. Will try several times until the callback is -# properly emitted and stop when successful (or after a given number of tries). -# Used to emit a single callback. Destroy it and create a new class for a new callback. -# Emits "success" on success, "failure" on error and "stopped" when gave up trying -# to perform the callback. -module.exports = class CallbackEmitter extends EventEmitter - - constructor: (@callbackURL, @message, @backupURL) -> - @nextInterval = 0 - @timestap = 0 - @permanent = false - - start: (permanent) -> - @timestamp = new Date().getTime() - @nextInterval = 0 - @permanent = permanent - @_scheduleNext 0 - - _scheduleNext: (timeout) -> - setTimeout( => - @_emitMessage (error, result) => - if not error? and result - @emit "success" - else - @emit "failure", error - - # get the next interval we have to wait and schedule a new try - interval = config.hooks.retryIntervals[@nextInterval] - if interval? - Logger.warn "[Emitter] trying the callback again in #{interval/1000.0} secs" - @nextInterval++ - @_scheduleNext(interval) - - # no intervals anymore, time to give up - else - @nextInterval = if not @permanent then 0 else 8 # Reset interval to permanent hooks - # If a hook has backup URLs for the POSTS, use them after a few failed attempts - if @backupURL? and @permanent then @backupURL.push(@backupURL[0]); @backupURL.shift(); @callbackURL = @backupURL[0] - @_scheduleNext(interval) if @permanent - @emit "stopped" if not @permanent - - , timeout) - - _emitMessage: (callback) -> - # data to be sent - # note: keep keys in alphabetical order - data = - event: @message - timestamp: @timestamp - - # calculate the checksum - checksum = Utils.checksum("#{@callbackURL}#{JSON.stringify(data)}#{config.bbb.sharedSecret}") - - # get the final callback URL, including the checksum - urlObj = url.parse(@callbackURL, true) - callbackURL = @callbackURL - callbackURL += if _.isEmpty(urlObj.search) then "?" else "&" - callbackURL += "checksum=#{checksum}" - - requestOptions = - followRedirect: true - maxRedirects: 10 - uri: callbackURL - method: "POST" - form: data - - request requestOptions, (error, response, body) -> - if error? or not (response?.statusCode >= 200 and response?.statusCode < 300) - Logger.warn "[Emitter] error in the callback call to: [#{requestOptions.uri}] for #{simplifiedEvent(data.event)}", "error:", error, "status:", response?.statusCode - callback error, false - else - Logger.info "[Emitter] successful callback call to: [#{requestOptions.uri}] for #{simplifiedEvent(data.event)}" - callback null, true - -# A simple string that identifies the event -simplifiedEvent = (event) -> - try - eventJs = JSON.parse(event) - "event: { name: #{eventJs.data?.id}, timestamp: #{eventJs.data.event?.ts} }" - catch e - "event: #{event}" diff --git a/bbb-webhooks/callback_emitter.js b/bbb-webhooks/callback_emitter.js new file mode 100644 index 0000000000000000000000000000000000000000..edf4b2488ea870231b34a238abdb4759d0d99712 --- /dev/null +++ b/bbb-webhooks/callback_emitter.js @@ -0,0 +1,111 @@ +"use strict"; +let CallbackEmitter; +const _ = require('lodash'); +const request = require("request"); +const url = require('url'); +const EventEmitter = require('events').EventEmitter; + +const config = require("./config.js"); +const Logger = require("./logger.js"); +const Utils = require("./utils.js"); + +// Use to perform a callback. Will try several times until the callback is +// properly emitted and stop when successful (or after a given number of tries). +// Used to emit a single callback. Destroy it and create a new class for a new callback. +// Emits "success" on success, "failure" on error and "stopped" when gave up trying +// to perform the callback. +module.exports = (CallbackEmitter = class CallbackEmitter extends EventEmitter { + + constructor(callbackURL, message, backupURL) { + super(); + this.callbackURL = callbackURL; + this.message = message; + this.backupURL = backupURL; + this.nextInterval = 0; + this.timestap = 0; + this.permanent = false; + } + + start(permanent) { + this.timestamp = new Date().getTime(); + this.nextInterval = 0; + this.permanent = permanent; + this._scheduleNext(0); + } + + _scheduleNext(timeout) { + setTimeout( () => { + this._emitMessage((error, result) => { + if ((error == null) && result) { + this.emit("success"); + } else { + this.emit("failure", error); + + // get the next interval we have to wait and schedule a new try + const interval = config.hooks.retryIntervals[this.nextInterval]; + if (interval != null) { + Logger.warn(`[Emitter] trying the callback again in ${interval/1000.0} secs`); + this.nextInterval++; + this._scheduleNext(interval); + + // no intervals anymore, time to give up + } else { + this.nextInterval = !this.permanent ? 0 : 8; // Reset interval to permanent hooks + // If a hook has backup URLs for the POSTS, use them after a few failed attempts + if ((this.backupURL != null) && this.permanent) { this.backupURL.push(this.backupURL[0]); this.backupURL.shift(); this.callbackURL = this.backupURL[0]; } + if (this.permanent) { this._scheduleNext(interval); } + if (!this.permanent) { return this.emit("stopped"); } + } + } + }); + } + + , timeout); + } + + _emitMessage(callback) { + // data to be sent + // note: keep keys in alphabetical order + const data = { + event: this.message, + timestamp: this.timestamp + }; + + // calculate the checksum + const checksum = Utils.checksum(`${this.callbackURL}${JSON.stringify(data)}${config.bbb.sharedSecret}`); + + // get the final callback URL, including the checksum + const urlObj = url.parse(this.callbackURL, true); + let callbackURL = this.callbackURL; + callbackURL += _.isEmpty(urlObj.search) ? "?" : "&"; + callbackURL += `checksum=${checksum}`; + + const requestOptions = { + followRedirect: true, + maxRedirects: 10, + uri: callbackURL, + method: "POST", + form: data + }; + + request(requestOptions, function(error, response, body) { + if ((error != null) || !(((response != null ? response.statusCode : undefined) >= 200) && ((response != null ? response.statusCode : undefined) < 300))) { + Logger.warn(`[Emitter] error in the callback call to: [${requestOptions.uri}] for ${simplifiedEvent(data.event)}`, "error:", error, "status:", response != null ? response.statusCode : undefined); + callback(error, false); + } else { + Logger.info(`[Emitter] successful callback call to: [${requestOptions.uri}] for ${simplifiedEvent(data.event)}`); + callback(null, true); + } + }); + } +}); + +// A simple string that identifies the event +var simplifiedEvent = function(event) { + try { + const eventJs = JSON.parse(event); + return `event: { name: ${(eventJs.data != null ? eventJs.data.id : undefined)}, timestamp: ${(eventJs.data.event != null ? eventJs.data.event.ts : undefined)} }`; + } catch (e) { + return `event: ${event}`; + } +}; diff --git a/bbb-webhooks/config.coffee b/bbb-webhooks/config.coffee deleted file mode 100644 index 64ea1068044fe039fbce9c1584c2640fcced18dd..0000000000000000000000000000000000000000 --- a/bbb-webhooks/config.coffee +++ /dev/null @@ -1,90 +0,0 @@ -# Global configuration file - -# load the local configs -config = require("./config_local") - -# BigBlueButton configs -config.bbb or= {} -config.bbb.sharedSecret or= "ac8821d35c447bb3b959ca8fa05b1d3f" -config.bbb.apiPath or= "/bigbluebutton/api" - -# Web server configs -config.server or= {} -config.server.port or= 3005 - -# Web hooks configs -config.hooks or= {} -config.hooks.pchannel or= "bigbluebutton:*" -# IP where aggr will be hosted -config.hooks.aggr or= [] -config.hooks.queueSize or= 10000 -config.hooks.getRaw or= false - -config.webhooks or= {} -config.webhooks.rawPath or= "payload" -config.webhooks.meetingId or= "meeting_id" - -# Retry intervals for failed attempts for perform callback calls. -# In ms. Totals to around 5min. -config.hooks.retryIntervals = [ - 100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000 -] - -# Mappings of internal to external meeting IDs -config.mappings = {} -config.mappings.cleanupInterval = 10000 # 10 secs, in ms -config.mappings.timeout = 1000*60*60*24 # 24 hours, in ms - -# Redis -config.redis = {} -config.redis.keys = {} -config.redis.keys.hook = (id) -> "bigbluebutton:webhooks:hook:#{id}" -config.redis.keys.hooks = "bigbluebutton:webhooks:hooks" -config.redis.keys.mappings = "bigbluebutton:webhooks:mappings" -config.redis.keys.mapping = (id) -> "bigbluebutton:webhooks:mapping:#{id}" -config.redis.keys.events = (id) -> "bigbluebutton:webhooks:events:#{id}" - -config.api = {} -config.api.responses = {} -config.api.responses.failure = (key, msg) -> - "<response> \ - <returncode>FAILED</returncode> \ - <messageKey>#{key}</messageKey> \ - <message>#{msg}</message> \ - </response>" -config.api.responses.checksumError = - config.api.responses.failure("checksumError", "You did not pass the checksum security check.") - -config.api.responses.createSuccess = (id, permanent, getRaw) -> - "<response> \ - <returncode>SUCCESS</returncode> \ - <hookID>#{id}</hookID> \ - <permanentHook>#{permanent}</permanentHook> \ - <rawData>#{getRaw}</rawData> \ - </response>" -config.api.responses.createFailure = - config.api.responses.failure("createHookError", "An error happened while creating your hook. Check the logs.") -config.api.responses.createDuplicated = (id) -> - "<response> \ - <returncode>SUCCESS</returncode> \ - <hookID>#{id}</hookID> \ - <messageKey>duplicateWarning</messageKey> \ - <message>There is already a hook for this callback URL.</message> \ - </response>" - -config.api.responses.destroySuccess = - "<response> \ - <returncode>SUCCESS</returncode> \ - <removed>true</removed> \ - </response>" -config.api.responses.destroyFailure = - config.api.responses.failure("destroyHookError", "An error happened while removing your hook. Check the logs.") -config.api.responses.destroyNoHook = - config.api.responses.failure("destroyMissingHook", "The hook informed was not found.") - -config.api.responses.missingParamCallbackURL = - config.api.responses.failure("missingParamCallbackURL", "You must specify a callbackURL in the parameters.") -config.api.responses.missingParamHookID = - config.api.responses.failure("missingParamHookID", "You must specify a hookID in the parameters.") - -module.exports = config diff --git a/bbb-webhooks/config.js b/bbb-webhooks/config.js new file mode 100644 index 0000000000000000000000000000000000000000..eeef45abd477d50716de66887b864dbad28f6f55 --- /dev/null +++ b/bbb-webhooks/config.js @@ -0,0 +1,94 @@ +// Global configuration file +"use strict"; + +// load the local configs +const config = require("./config_local.js"); + +// BigBlueButton configs +if (!config.bbb) { config.bbb = {}; } +if (!config.bbb.sharedSecret) { config.bbb.sharedSecret = "ac8821d35c447bb3b959ca8fa05b1d3f"; } +if (!config.bbb.apiPath) { config.bbb.apiPath = "/bigbluebutton/api"; } + +// Web server configs +if (!config.server) { config.server = {}; } +if (!config.server.port) { config.server.port = 3005; } + +// Web hooks configs +if (!config.hooks) { config.hooks = {}; } +if (!config.hooks.pchannel) { config.hooks.pchannel = "bigbluebutton:*"; } +// IP where aggr will be hosted +if (!config.hooks.aggr) { config.hooks.aggr = []; } +if (!config.hooks.queueSize) { config.hooks.queueSize = 10000; } +if (!config.hooks.getRaw) { config.hooks.getRaw = false; } + +if (!config.webhooks) { config.webhooks = {}; } +if (!config.webhooks.rawPath) { config.webhooks.rawPath = "payload"; } +if (!config.webhooks.meetingID) { config.webhooks.meetingID = "meeting_id"; } + +// Retry intervals for failed attempts for perform callback calls. +// In ms. Totals to around 5min. +config.hooks.retryIntervals = [ + 100, 500, 1000, 2000, 4000, 8000, 10000, 30000, 60000, 60000, 60000, 60000 +]; + +// Mappings of internal to external meeting IDs +config.mappings = {}; +config.mappings.cleanupInterval = 10000; // 10 secs, in ms +config.mappings.timeout = 1000*60*60*24; // 24 hours, in ms + +// Redis +config.redis = {}; +config.redis.keys = {}; +config.redis.keys.hook = id => `bigbluebutton:webhooks:hook:${id}`; +config.redis.keys.hooks = "bigbluebutton:webhooks:hooks"; +config.redis.keys.mappings = "bigbluebutton:webhooks:mappings"; +config.redis.keys.mapping = id => `bigbluebutton:webhooks:mapping:${id}`; +config.redis.keys.events = id => `bigbluebutton:webhooks:events:${id}`; + +config.api = {}; +config.api.responses = {}; +config.api.responses.failure = (key, msg) => + `<response> \ +<returncode>FAILED</returncode> \ +<messageKey>${key}</messageKey> \ +<message>${msg}</message> \ +</response>` +; +config.api.responses.checksumError = + config.api.responses.failure("checksumError", "You did not pass the checksum security check."); + +config.api.responses.createSuccess = (id, permanent, getRaw) => + `<response> \ +<returncode>SUCCESS</returncode> \ +<hookID>${id}</hookID> \ +<permanentHook>${permanent}</permanentHook> \ +<rawData>${getRaw}</rawData> \ +</response>` +; +config.api.responses.createFailure = + config.api.responses.failure("createHookError", "An error happened while creating your hook. Check the logs."); +config.api.responses.createDuplicated = id => + `<response> \ +<returncode>SUCCESS</returncode> \ +<hookID>${id}</hookID> \ +<messageKey>duplicateWarning</messageKey> \ +<message>There is already a hook for this callback URL.</message> \ +</response>` +; + +config.api.responses.destroySuccess = + `<response> \ +<returncode>SUCCESS</returncode> \ +<removed>true</removed> \ +</response>`; +config.api.responses.destroyFailure = + config.api.responses.failure("destroyHookError", "An error happened while removing your hook. Check the logs."); +config.api.responses.destroyNoHook = + config.api.responses.failure("destroyMissingHook", "The hook informed was not found."); + +config.api.responses.missingParamCallbackURL = + config.api.responses.failure("missingParamCallbackURL", "You must specify a callbackURL in the parameters."); +config.api.responses.missingParamHookID = + config.api.responses.failure("missingParamHookID", "You must specify a hookID in the parameters."); + +module.exports = config; diff --git a/bbb-webhooks/config_local.coffee.example b/bbb-webhooks/config_local.coffee.example deleted file mode 100644 index d8348a0a0835ce5cec5da6ab1ed14cc64e69c224..0000000000000000000000000000000000000000 --- a/bbb-webhooks/config_local.coffee.example +++ /dev/null @@ -1,25 +0,0 @@ -# Local configuration file - -config = {} - -# Shared secret of your BigBlueButton server. -config.bbb = {} -config.bbb.sharedSecret = "33e06642a13942004fd83b3ba6e4104a" - -# The port in which the API server will run. -config.server = {} -config.server.port = 3005 - -# Callbacks will be triggered for all the events in this list and only for these events. -# You only need to specify it if you want events that are not used by default or -# if you want to restrict the events used. See `config.coffee` for the default list. -# -# config.hooks = {} -# config.hooks.events = [ -# { channel: "bigbluebutton:from-bbb-apps:meeting", name: "meeting_created_message" }, -# { channel: "bigbluebutton:from-bbb-apps:meeting", name: "meeting_destroyed_event" }, -# { channel: "bigbluebutton:from-bbb-apps:users", name: "user_joined_message" }, -# { channel: "bigbluebutton:from-bbb-apps:users", name: "user_left_message" } -# ] - -module.exports = config diff --git a/bbb-webhooks/config_local.js.example b/bbb-webhooks/config_local.js.example new file mode 100644 index 0000000000000000000000000000000000000000..7942a0341f9e10c4fe63d1b4567ce977aba839a3 --- /dev/null +++ b/bbb-webhooks/config_local.js.example @@ -0,0 +1,25 @@ +"use strict"; +// Local configuration file + +const config = {}; + +// Shared secret of your BigBlueButton server. +config.bbb = {}; +config.bbb.sharedSecret = "mysharedsecret"; + +// The port in which the API server will run. +config.server = {}; +config.server.port = 3005; + +// Callbacks will be triggered for all the events in this list and only for these events. +config.hooks = {}; +config.hooks.pchannel = "bigbluebutton:*"; +// IP where aggr will be hosted +config.hooks.aggr = ["request.catcher.url", "another.request.catcher.url"] + +// Allow global hook to receive all events with raw data +config.hooks.getRaw = false; + +config.webhooks = {}; + +module.exports = config; diff --git a/bbb-webhooks/hook.coffee b/bbb-webhooks/hook.coffee deleted file mode 100644 index 80695b475dc0c277f82e81e613b00696f7685d37..0000000000000000000000000000000000000000 --- a/bbb-webhooks/hook.coffee +++ /dev/null @@ -1,265 +0,0 @@ -_ = require("lodash") -async = require("async") -redis = require("redis") - -config = require("./config") -CallbackEmitter = require("./callback_emitter") -IDMapping = require("./id_mapping") -Logger = require("./logger") - -# The database of hooks. -# Used always from memory, but saved to redis for persistence. -# -# Format: -# { id: Hook } -# Format on redis: -# * a SET "...:hooks" with all ids -# * a HASH "...:hook:<id>" for each hook with some of its attributes -db = {} -nextID = 1 - -# The representation of a hook and its properties. Stored in memory and persisted -# to redis. -# Hooks can be global, receiving callback calls for events from all meetings on the -# server, or for a specific meeting. If an `externalMeetingID` is set in the hook, -# it will only receive calls related to this meeting, otherwise it will be global. -# Events are kept in a queue to be sent in the order they are received. -# TODO: The queue should be cleared at some point. The hook is destroyed if too many -# callback attempts fail, after ~5min. So the queue is already protected in this case. -# But if the requests are going by but taking too long, the queue might be increasing -# faster than the callbacks are made. -module.exports = class Hook - - constructor: -> - @id = null - @callbackURL = null - @externalMeetingID = null - @queue = [] - @emitter = null - @redisClient = redis.createClient() - @permanent = false - @backupURL = [] - @getRaw = false - - save: (callback) -> - @redisClient.hmset config.redis.keys.hook(@id), @toRedis(), (error, reply) => - Logger.error "[Hook] error saving hook to redis:", error, reply if error? - @redisClient.sadd config.redis.keys.hooks, @id, (error, reply) => - Logger.error "[Hook] error saving hookID to the list of hooks:", error, reply if error? - - db[@id] = this - callback?(error, db[@id]) - - destroy: (callback) -> - @redisClient.srem config.redis.keys.hooks, @id, (error, reply) => - Logger.error "[Hook] error removing hookID from the list of hooks:", error, reply if error? - @redisClient.del config.redis.keys.hook(@id), (error) => - Logger.error "[Hook] error removing hook from redis:", error if error? - - if db[@id] - delete db[@id] - callback?(error, true) - else - callback?(error, false) - - # Is this a global hook? - isGlobal: -> - not @externalMeetingID? - - # The meeting from which this hook should receive events. - targetMeetingID: -> - @externalMeetingID - - # Puts a new message in the queue. Will also trigger a processing in the queue so this - # message might be processed instantly. - enqueue: (message) -> - @redisClient.llen config.redis.keys.events(@id), (error, reply) => - length = reply - if length < config.hooks.queueSize - Logger.info "[Hook] enqueueing message:", JSON.stringify(message) - # Add message to redis queue - @redisClient.rpush config.redis.keys.events(@id), JSON.stringify(message), (error,reply) => - Logger.error "[Hook] error pushing event to redis queue:", JSON.stringify(message), error if error? - @queue.push JSON.stringify(message) - @_processQueue(length + 1) - else - Logger.warn "[Hook] queue size exceed, event:", JSON.stringify(message) - - toRedis: -> - r = - "hookID": @id, - "callbackURL": @callbackURL, - "permanent": @permanent, - "backupURL": @backupURL, - "getRaw": @getRaw - r.externalMeetingID = @externalMeetingID if @externalMeetingID? - r - - fromRedis: (redisData) -> - @id = parseInt(redisData.hookID) - @callbackURL = redisData.callbackURL - @permanent = redisData.permanent - @backupURL = redisData.backupURL - @getRaw = redisData.getRaw - if redisData.externalMeetingID? - @externalMeetingID = redisData.externalMeetingID - else - @externalMeetingID = null - - # Gets the first message in the queue and start an emitter to send it. Will only do it - # if there is no emitter running already and if there is a message in the queue. - _processQueue: (length) -> - # Will try to send up to 10 messages together if they're enqueued - lengthIn = if length > 10 then 10 else length - num = lengthIn + 1 - # Concat messages - message = @queue.slice(0,lengthIn) - message = message.join(";") - - return if not message? or @emitter? or length <= 0 - # Add params so emitter will 'know' when a hook is permanent and have backupURLs - @emitter = new CallbackEmitter(@callbackURL, message, @backupURL) - @emitter.start(@permanent) - - @emitter.on "success", => - delete @emitter - while num -= 1 - # Remove the sent message from redis - @redisClient.lpop config.redis.keys.events(@id), (error, reply) => - Logger.error "[Hook] error removing event from redis queue:", error if error? - @queue.shift() # pop the first message just sent - @_processQueue(length - lengthIn) # go to the next message - - # gave up trying to perform the callback, remove the hook forever if the hook's not permanent (emmiter will validate that) - @emitter.on "stopped", (error) => - Logger.warn "[Hook] too many failed attempts to perform a callback call, removing the hook for:", @callbackURL - @destroy() - - @addSubscription = (callbackURL, meetingID=null, getRaw, callback) -> - #Since we can pass a list of URLs to serve as backup for the permanent hook, we need to check that - firstURL = if callbackURL instanceof Array then callbackURL[0] else callbackURL - - hook = Hook.findByCallbackURLSync(firstURL) - if hook? - callback?(new Error("There is already a subscription for this callback URL"), hook) - else - msg = "[Hook] adding a hook with callback URL: [#{firstURL}]," - msg += " for the meeting: [#{meetingID}]" if meetingID? - Logger.info msg - - hook = new Hook() - hook.callbackURL = firstURL - hook.externalMeetingID = meetingID - hook.getRaw = getRaw - hook.permanent = if firstURL in config.hooks.aggr then true else false - if hook.permanent then hook.id = 1;nextID++ else hook.id = nextID++ - # Create backup URLs list - backupURLs = if callbackURL instanceof Array then callbackURL else [] - backupURLs.push(firstURL); backupURLs.shift() - hook.backupURL = backupURLs - Logger.info "[Hook] Backup URLs:", hook.backupURL - # Sync permanent queue - if hook.permanent - hook.redisClient.llen config.redis.keys.events(hook.id), (error, len) => - if len > 0 - length = len - hook.redisClient.lrange config.redis.keys.events(hook.id), 0, len, (error, elements) => - elements.forEach (element) => - hook.queue.push element - hook._processQueue(length) if hook.queue.length > 0 - hook.save (error, hook) -> callback?(error, hook) - - @removeSubscription = (hookID, callback) -> - hook = Hook.getSync(hookID) - if hook? and hook.permanent is "false" or hook.permanent is false - msg = "[Hook] removing the hook with callback URL: [#{hook.callbackURL}]," - msg += " for the meeting: [#{hook.externalMeetingID}]" if hook.externalMeetingID? - Logger.info msg - - hook.destroy (error, removed) -> callback?(error, removed) - else - callback?(null, false) - - @countSync = -> - Object.keys(db).length - - @getSync = (id) -> - db[id] - - @firstSync = -> - keys = Object.keys(db) - if keys.length > 0 - db[keys[0]] - else - null - - @findByExternalMeetingIDSync = (externalMeetingID) -> - hooks = Hook.allSync() - _.filter(hooks, (hook) -> - (externalMeetingID? and externalMeetingID is hook.externalMeetingID) - ) - - @allGlobalSync = -> - hooks = Hook.allSync() - _.filter(hooks, (hook) -> hook.isGlobal()) - - @allSync = -> - arr = Object.keys(db).reduce((arr, id) -> - arr.push db[id] - arr - , []) - arr - - @clearSync = -> - for id of db - delete db[id] - db = {} - - @findByCallbackURLSync = (callbackURL) -> - for id of db - if db[id].callbackURL is callbackURL - return db[id] - - @initialize = (callback) -> - Hook.resync(callback) - - # Gets all hooks from redis to populate the local database. - # Calls `callback()` when done. - @resync = (callback) -> - client = redis.createClient() - # Remove previous permanent hook (always ID = 1) - client.srem config.redis.keys.hooks, 1, (error, reply) => - Logger.error "[Hook] error removing previous permanent hook from list:", error if error? - client.del config.redis.keys.hook(1), (error) => - Logger.error "[Hook] error removing previous permanent hook from redis:", error if error? - - tasks = [] - - client.smembers config.redis.keys.hooks, (error, hooks) => - Logger.error "[Hook] error getting list of hooks from redis:", error if error? - hooks.forEach (id) => - tasks.push (done) => - client.hgetall config.redis.keys.hook(id), (error, hookData) -> - Logger.error "[Hook] error getting information for a hook from redis:", error if error? - - if hookData? - hook = new Hook() - hook.fromRedis(hookData) - # sync events queue - client.llen config.redis.keys.events(hook.id), (error, len) => - length = len - client.lrange config.redis.keys.events(hook.id), 0, len, (error, elements) => - elements.forEach (element) => - hook.queue.push element - # Persist hook to redis - hook.save (error, hook) -> - nextID = hook.id + 1 if hook.id >= nextID - hook._processQueue(hook.queue.length) if hook.queue.length > 0 - done(null, hook) - else - done(null, null) - - async.series tasks, (errors, result) -> - hooks = _.map(Hook.allSync(), (hook) -> "[#{hook.id}] #{hook.callbackURL}") - Logger.info "[Hook] finished resync, hooks registered:", hooks - callback?() diff --git a/bbb-webhooks/hook.js b/bbb-webhooks/hook.js new file mode 100644 index 0000000000000000000000000000000000000000..8ab3616149d4128d1711b40b10aab8eea7702afd --- /dev/null +++ b/bbb-webhooks/hook.js @@ -0,0 +1,327 @@ +"use strict"; +let Hook; +const _ = require("lodash"); +const async = require("async"); +const redis = require("redis"); + +const config = require("./config.js"); +const CallbackEmitter = require("./callback_emitter.js"); +const IDMapping = require("./id_mapping.js"); +const Logger = require("./logger.js"); + +// The database of hooks. +// Used always from memory, but saved to redis for persistence. +// +// Format: +// { id: Hook } +// Format on redis: +// * a SET "...:hooks" with all ids +// * a HASH "...:hook:<id>" for each hook with some of its attributes +let db = {}; +let nextID = 1; + +// The representation of a hook and its properties. Stored in memory and persisted +// to redis. +// Hooks can be global, receiving callback calls for events from all meetings on the +// server, or for a specific meeting. If an `externalMeetingID` is set in the hook, +// it will only receive calls related to this meeting, otherwise it will be global. +// Events are kept in a queue to be sent in the order they are received. +// But if the requests are going by but taking too long, the queue might be increasing +// faster than the callbacks are made. In this case the events will be concatenated +// and send up to 10 events in every post + +module.exports = (Hook = class Hook { + + constructor() { + this.id = null; + this.callbackURL = null; + this.externalMeetingID = null; + this.queue = []; + this.emitter = null; + this.redisClient = redis.createClient(); + this.permanent = false; + this.backupURL = []; + this.getRaw = false; + } + + save(callback) { + this.redisClient.hmset(config.redis.keys.hook(this.id), this.toRedis(), (error, reply) => { + if (error != null) { Logger.error("[Hook] error saving hook to redis:", error, reply); } + this.redisClient.sadd(config.redis.keys.hooks, this.id, (error, reply) => { + if (error != null) { Logger.error("[Hook] error saving hookID to the list of hooks:", error, reply); } + + db[this.id] = this; + (typeof callback === 'function' ? callback(error, db[this.id]) : undefined); + }); + }); + } + + destroy(callback) { + this.redisClient.srem(config.redis.keys.hooks, this.id, (error, reply) => { + if (error != null) { Logger.error("[Hook] error removing hookID from the list of hooks:", error, reply); } + this.redisClient.del(config.redis.keys.hook(this.id), error => { + if (error != null) { Logger.error("[Hook] error removing hook from redis:", error); } + + if (db[this.id]) { + delete db[this.id]; + (typeof callback === 'function' ? callback(error, true) : undefined); + } else { + (typeof callback === 'function' ? callback(error, false) : undefined); + } + }); + }); + } + + // Is this a global hook? + isGlobal() { + return (this.externalMeetingID == null); + } + + // The meeting from which this hook should receive events. + targetMeetingID() { + return this.externalMeetingID; + } + + // Puts a new message in the queue. Will also trigger a processing in the queue so this + // message might be processed instantly. + enqueue(message) { + this.redisClient.llen(config.redis.keys.events(this.id), (error, reply) => { + const length = reply; + if (length < config.hooks.queueSize) { + Logger.info(`[Hook] ${this.callbackURL} enqueueing message:`, JSON.stringify(message)); + // Add message to redis queue + this.redisClient.rpush(config.redis.keys.events(this.id), JSON.stringify(message), (error,reply) => {}); + if (error != null) { Logger.error("[Hook] error pushing event to redis queue:", JSON.stringify(message), error); } + this.queue.push(JSON.stringify(message)); + this._processQueue(); + } else { + Logger.warn("[Hook] queue size exceed, event:", JSON.stringify(message)); + } + }); + } + + toRedis() { + const r = { + "hookID": this.id, + "callbackURL": this.callbackURL, + "permanent": this.permanent, + "backupURL": this.backupURL, + "getRaw": this.getRaw + }; + if (this.externalMeetingID != null) { r.externalMeetingID = this.externalMeetingID; } + return r; + } + + fromRedis(redisData) { + this.id = parseInt(redisData.hookID); + this.callbackURL = redisData.callbackURL; + this.permanent = redisData.permanent; + this.backupURL = redisData.backupURL; + this.getRaw = redisData.getRaw; + if (redisData.externalMeetingID != null) { + this.externalMeetingID = redisData.externalMeetingID; + } else { + this.externalMeetingID = null; + } + } + + // Gets the first message in the queue and start an emitter to send it. Will only do it + // if there is no emitter running already and if there is a message in the queue. + _processQueue() { + // Will try to send up to 10 messages together if they're enqueued + const lengthIn = this.queue.length > 10 ? 10 : this.queue.length; + let num = lengthIn + 1; + // Concat messages + let message = this.queue.slice(0,lengthIn); + message = message.join(","); + + if ((message == null) || (this.emitter != null) || (lengthIn <= 0)) { return; } + // Add params so emitter will 'know' when a hook is permanent and have backupURLs + this.emitter = new CallbackEmitter(this.callbackURL, message, this.backupURL); + this.emitter.start(this.permanent); + + this.emitter.on("success", () => { + delete this.emitter; + while ((num -= 1)) { + // Remove the sent message from redis + this.redisClient.lpop(config.redis.keys.events(this.id), (error, reply) => { + if (error != null) { return Logger.error("[Hook] error removing event from redis queue:", error); } + }); + this.queue.shift(); + } // pop the first message just sent + this._processQueue(); // go to the next message + }); + + // gave up trying to perform the callback, remove the hook forever if the hook's not permanent (emmiter will validate that) + return this.emitter.on("stopped", error => { + Logger.warn("[Hook] too many failed attempts to perform a callback call, removing the hook for:", this.callbackURL); + this.destroy(); + }); + } + + static addSubscription(callbackURL, meetingID, getRaw, callback) { + //Since we can pass a list of URLs to serve as backup for the permanent hook, we need to check that + const firstURL = callbackURL instanceof Array ? callbackURL[0] : callbackURL; + + let hook = Hook.findByCallbackURLSync(firstURL); + if (hook != null) { + return (typeof callback === 'function' ? callback(new Error("There is already a subscription for this callback URL"), hook) : undefined); + } else { + let msg = `[Hook] adding a hook with callback URL: [${firstURL}],`; + if (meetingID != null) { msg += ` for the meeting: [${meetingID}]`; } + Logger.info(msg); + + hook = new Hook(); + hook.callbackURL = firstURL; + hook.externalMeetingID = meetingID; + hook.getRaw = getRaw; + hook.permanent = config.hooks.aggr.some( url => { + return url === firstURL + }); + if (hook.permanent) { hook.id = 1;nextID++; } else { hook.id = nextID++; } + // Create backup URLs list + let backupURLs = callbackURL instanceof Array ? callbackURL : []; + backupURLs.push(firstURL); backupURLs.shift(); + hook.backupURL = backupURLs; + Logger.info("[Hook] Backup URLs:", hook.backupURL); + // Sync permanent queue + if (hook.permanent) { + hook.redisClient.llen(config.redis.keys.events(hook.id), (error, len) => { + if (len > 0) { + const length = len; + hook.redisClient.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => { + elements.forEach(element => { + hook.queue.push(element); + }); + if (hook.queue.length > 0) { return hook._processQueue(); } + }); + } + }); + } + hook.save((error, hook) => typeof callback === 'function' ? callback(error, hook) : undefined); + } + } + + static removeSubscription(hookID, callback) { + let hook = Hook.getSync(hookID); + if (((hook != null) && (hook.permanent === "false")) || (hook.permanent === false)) { + let msg = `[Hook] removing the hook with callback URL: [${hook.callbackURL}],`; + if (hook.externalMeetingID != null) { msg += ` for the meeting: [${hook.externalMeetingID}]`; } + Logger.info(msg); + + hook.destroy((error, removed) => typeof callback === 'function' ? callback(error, removed) : undefined); + } else { + return (typeof callback === 'function' ? callback(null, false) : undefined); + } + } + + static countSync() { + return Object.keys(db).length; + } + + static getSync(id) { + return db[id]; + } + + static firstSync() { + const keys = Object.keys(db); + if (keys.length > 0) { + return db[keys[0]]; + } else { + return null; + } + } + + static findByExternalMeetingIDSync(externalMeetingID) { + const hooks = Hook.allSync(); + return _.filter(hooks, hook => (externalMeetingID != null) && (externalMeetingID === hook.externalMeetingID)); + } + + static allGlobalSync() { + const hooks = Hook.allSync(); + return _.filter(hooks, hook => hook.isGlobal()); + } + + static allSync() { + let arr = Object.keys(db).reduce(function(arr, id) { + arr.push(db[id]); + return arr; + } + , []); + return arr; + } + + static clearSync() { + for (let id in db) { + delete db[id]; + } + return db = {}; + } + + static findByCallbackURLSync(callbackURL) { + for (let id in db) { + if (db[id].callbackURL === callbackURL) { + return db[id]; + } + } + } + + static initialize(callback) { + Hook.resync(callback); + } + + // Gets all hooks from redis to populate the local database. + // Calls `callback()` when done. + static resync(callback) { + let client = redis.createClient(); + // Remove previous permanent hook (always ID = 1) + client.srem(config.redis.keys.hooks, 1, (error, reply) => { + if (error != null) { Logger.error("[Hook] error removing previous permanent hook from list:", error); } + client.del(config.redis.keys.hook(1), error => { + if (error != null) { Logger.error("[Hook] error removing previous permanent hook from redis:", error); } + }); + }); + + let tasks = []; + + client.smembers(config.redis.keys.hooks, (error, hooks) => { + if (error != null) { Logger.error("[Hook] error getting list of hooks from redis:", error); } + hooks.forEach(id => { + tasks.push(done => { + client.hgetall(config.redis.keys.hook(id), function(error, hookData) { + if (error != null) { Logger.error("[Hook] error getting information for a hook from redis:", error); } + + if (hookData != null) { + let length; + let hook = new Hook(); + hook.fromRedis(hookData); + // sync events queue + client.llen(config.redis.keys.events(hook.id), (error, len) => { + length = len; + client.lrange(config.redis.keys.events(hook.id), 0, len, (error, elements) => { + elements.forEach(element => { + hook.queue.push(element); + }); + }); + }); + // Persist hook to redis + hook.save( (error, hook) => { + if (hook.id >= nextID) { nextID = hook.id + 1; } + if (hook.queue.length > 0) { hook._processQueue(); } + done(null, hook); + }); + } else { + done(null, null); + } + }); + }); + }); + + async.series(tasks, function(errors, result) { + hooks = _.map(Hook.allSync(), hook => `[${hook.id}] ${hook.callbackURL}`); + Logger.info("[Hook] finished resync, hooks registered:", hooks); + (typeof callback === 'function' ? callback() : undefined); + }); + }); + } +}); diff --git a/bbb-webhooks/id_mapping.coffee b/bbb-webhooks/id_mapping.coffee deleted file mode 100644 index ec9a8cc9c00260488840a8ce24634914d8852210..0000000000000000000000000000000000000000 --- a/bbb-webhooks/id_mapping.coffee +++ /dev/null @@ -1,163 +0,0 @@ -_ = require("lodash") -async = require("async") -redis = require("redis") - -config = require("./config") -Logger = require("./logger") - -# The database of mappings. Uses the internal ID as key because it is unique -# unlike the external ID. -# Used always from memory, but saved to redis for persistence. -# -# Format: -# { -# internalMeetingID: { -# id: @id -# externalMeetingID: @externalMeetingID -# internalMeetingID: @internalMeetingID -# lastActivity: @lastActivity -# } -# } -# Format on redis: -# * a SET "...:mappings" with all ids (not meeting ids, the object id) -# * a HASH "...:mapping:<id>" for each mapping with all its attributes -db = {} -nextID = 1 - -# A simple model to store mappings for meeting IDs. -module.exports = class IDMapping - - constructor: -> - @id = null - @externalMeetingID = null - @internalMeetingID = null - @lastActivity = null - @redisClient = redis.createClient() - - save: (callback) -> - @redisClient.hmset config.redis.keys.mapping(@id), @toRedis(), (error, reply) => - Logger.error "[IDMapping] error saving mapping to redis:", error, reply if error? - @redisClient.sadd config.redis.keys.mappings, @id, (error, reply) => - Logger.error "[IDMapping] error saving mapping ID to the list of mappings:", error, reply if error? - - db[@internalMeetingID] = this - callback?(error, db[@internalMeetingID]) - - destroy: (callback) -> - @redisClient.srem config.redis.keys.mappings, @id, (error, reply) => - Logger.error "[IDMapping] error removing mapping ID from the list of mappings:", error, reply if error? - @redisClient.del config.redis.keys.mapping(@id), (error) => - Logger.error "[IDMapping] error removing mapping from redis:", error if error? - - if db[@internalMeetingID] - delete db[@internalMeetingID] - callback?(error, true) - else - callback?(error, false) - - toRedis: -> - r = - "id": @id, - "internalMeetingID": @internalMeetingID - "externalMeetingID": @externalMeetingID - "lastActivity": @lastActivity - r - - fromRedis: (redisData) -> - @id = parseInt(redisData.id) - @externalMeetingID = redisData.externalMeetingID - @internalMeetingID = redisData.internalMeetingID - @lastActivity = redisData.lastActivity - - print: -> - JSON.stringify(@toRedis()) - - @addOrUpdateMapping = (internalMeetingID, externalMeetingID, callback) -> - mapping = new IDMapping() - mapping.id = nextID++ - mapping.internalMeetingID = internalMeetingID - mapping.externalMeetingID = externalMeetingID - mapping.lastActivity = new Date().getTime() - mapping.save (error, result) -> - Logger.info "[IDMapping] added or changed meeting mapping to the list #{externalMeetingID}:", mapping.print() - callback?(error, result) - - @removeMapping = (internalMeetingID, callback) -> - for internal, mapping of db - if mapping.internalMeetingID is internalMeetingID - mapping.destroy (error, result) -> - Logger.info "[IDMapping] removing meeting mapping from the list #{external}:", mapping.print() - callback?(error, result) - - @getInternalMeetingID = (externalMeetingID) -> - mapping = IDMapping.findByExternalMeetingID(externalMeetingID) - mapping?.internalMeetingID - - @getExternalMeetingID = (internalMeetingID) -> - db[internalMeetingID].externalMeetingID - - @findByExternalMeetingID = (externalMeetingID) -> - if externalMeetingID? - for internal, mapping of db - if mapping.externalMeetingID is externalMeetingID - return mapping - null - - @allSync = -> - arr = Object.keys(db).reduce((arr, id) -> - arr.push db[id] - arr - , []) - arr - - # Sets the last activity of the mapping for `internalMeetingID` to now. - @reportActivity = (internalMeetingID) -> - mapping = db[internalMeetingID] - if mapping? - mapping.lastActivity = new Date().getTime() - mapping.save() - - # Checks all current mappings for their last activity and removes the ones that - # are "expired", that had their last activity too long ago. - @cleanup = -> - now = new Date().getTime() - all = IDMapping.allSync() - toRemove = _.filter(all, (mapping) -> - mapping.lastActivity < now - config.mappings.timeout - ) - unless _.isEmpty(toRemove) - Logger.info "[IDMapping] expiring the mappings:", _.map(toRemove, (map) -> map.print()) - toRemove.forEach (mapping) -> mapping.destroy() - - # Initializes global methods for this model. - @initialize = (callback) -> - IDMapping.resync(callback) - IDMapping.cleanupInterval = setInterval(IDMapping.cleanup, config.mappings.cleanupInterval) - - # Gets all mappings from redis to populate the local database. - # Calls `callback()` when done. - @resync = (callback) -> - client = redis.createClient() - tasks = [] - - client.smembers config.redis.keys.mappings, (error, mappings) => - Logger.error "[IDMapping] error getting list of mappings from redis:", error if error? - - mappings.forEach (id) => - tasks.push (done) => - client.hgetall config.redis.keys.mapping(id), (error, mappingData) -> - Logger.error "[IDMapping] error getting information for a mapping from redis:", error if error? - - if mappingData? - mapping = new IDMapping() - mapping.fromRedis(mappingData) - mapping.save (error, hook) -> - nextID = mapping.id + 1 if mapping.id >= nextID - done(null, mapping) - else - done(null, null) - - async.series tasks, (errors, result) -> - mappings = _.map(IDMapping.allSync(), (m) -> m.print()) - Logger.info "[IDMapping] finished resync, mappings registered:", mappings - callback?() diff --git a/bbb-webhooks/id_mapping.js b/bbb-webhooks/id_mapping.js new file mode 100644 index 0000000000000000000000000000000000000000..c94178a62ea4863db09ce2772c6fd0185c10cbd3 --- /dev/null +++ b/bbb-webhooks/id_mapping.js @@ -0,0 +1,211 @@ +"use strict"; +let IDMapping; +const _ = require("lodash"); +const async = require("async"); +const redis = require("redis"); + +const config = require("./config.js"); +const Logger = require("./logger.js"); + +// The database of mappings. Uses the internal ID as key because it is unique +// unlike the external ID. +// Used always from memory, but saved to redis for persistence. +// +// Format: +// { +// internalMeetingID: { +// id: @id +// externalMeetingID: @externalMeetingID +// internalMeetingID: @internalMeetingID +// lastActivity: @lastActivity +// } +// } +// Format on redis: +// * a SET "...:mappings" with all ids (not meeting ids, the object id) +// * a HASH "...:mapping:<id>" for each mapping with all its attributes +const db = {}; +let nextID = 1; + +// A simple model to store mappings for meeting IDs. +module.exports = (IDMapping = class IDMapping { + + constructor() { + this.id = null; + this.externalMeetingID = null; + this.internalMeetingID = null; + this.lastActivity = null; + this.redisClient = redis.createClient(); + } + + save(callback) { + this.redisClient.hmset(config.redis.keys.mapping(this.id), this.toRedis(), (error, reply) => { + if (error != null) { Logger.error("[IDMapping] error saving mapping to redis:", error, reply); } + this.redisClient.sadd(config.redis.keys.mappings, this.id, (error, reply) => { + if (error != null) { Logger.error("[IDMapping] error saving mapping ID to the list of mappings:", error, reply); } + + db[this.internalMeetingID] = this; + (typeof callback === 'function' ? callback(error, db[this.internalMeetingID]) : undefined); + }); + }); + } + + destroy(callback) { + this.redisClient.srem(config.redis.keys.mappings, this.id, (error, reply) => { + if (error != null) { Logger.error("[IDMapping] error removing mapping ID from the list of mappings:", error, reply); } + this.redisClient.del(config.redis.keys.mapping(this.id), error => { + if (error != null) { Logger.error("[IDMapping] error removing mapping from redis:", error); } + + if (db[this.internalMeetingID]) { + delete db[this.internalMeetingID]; + (typeof callback === 'function' ? callback(error, true) : undefined); + } else { + (typeof callback === 'function' ? callback(error, false) : undefined); + } + }); + }); + } + + toRedis() { + const r = { + "id": this.id, + "internalMeetingID": this.internalMeetingID, + "externalMeetingID": this.externalMeetingID, + "lastActivity": this.lastActivity + }; + return r; + } + + fromRedis(redisData) { + this.id = parseInt(redisData.id); + this.externalMeetingID = redisData.externalMeetingID; + this.internalMeetingID = redisData.internalMeetingID; + this.lastActivity = redisData.lastActivity; + } + + print() { + return JSON.stringify(this.toRedis()); + } + + static addOrUpdateMapping(internalMeetingID, externalMeetingID, callback) { + let mapping = new IDMapping(); + mapping.id = nextID++; + mapping.internalMeetingID = internalMeetingID; + mapping.externalMeetingID = externalMeetingID; + mapping.lastActivity = new Date().getTime(); + mapping.save(function(error, result) { + Logger.info(`[IDMapping] added or changed meeting mapping to the list ${externalMeetingID}:`, mapping.print()); + (typeof callback === 'function' ? callback(error, result) : undefined); + }); + } + + static removeMapping(internalMeetingID, callback) { + return (() => { + let result = []; + for (let internal in db) { + var mapping = db[internal]; + if (mapping.internalMeetingID === internalMeetingID) { + result.push(mapping.destroy( (error, result) => { + Logger.info(`[IDMapping] removing meeting mapping from the list ${external}:`, mapping.print()); + return (typeof callback === 'function' ? callback(error, result) : undefined); + })); + } else { + result.push(undefined); + } + } + return result; + })(); + } + + static getInternalMeetingID(externalMeetingID) { + const mapping = IDMapping.findByExternalMeetingID(externalMeetingID); + return (mapping != null ? mapping.internalMeetingID : undefined); + } + + static getExternalMeetingID(internalMeetingID) { + return db[internalMeetingID].externalMeetingID; + } + + static findByExternalMeetingID(externalMeetingID) { + if (externalMeetingID != null) { + for (let internal in db) { + const mapping = db[internal]; + if (mapping.externalMeetingID === externalMeetingID) { + return mapping; + } + } + } + return null; + } + + static allSync() { + let arr = Object.keys(db).reduce(function(arr, id) { + arr.push(db[id]); + return arr; + } + , []); + return arr; + } + + // Sets the last activity of the mapping for `internalMeetingID` to now. + static reportActivity(internalMeetingID) { + let mapping = db[internalMeetingID]; + if (mapping != null) { + mapping.lastActivity = new Date().getTime(); + return mapping.save(); + } + } + + // Checks all current mappings for their last activity and removes the ones that + // are "expired", that had their last activity too long ago. + static cleanup() { + const now = new Date().getTime(); + const all = IDMapping.allSync(); + const toRemove = _.filter(all, mapping => mapping.lastActivity < (now - config.mappings.timeout)); + if (!_.isEmpty(toRemove)) { + Logger.info("[IDMapping] expiring the mappings:", _.map(toRemove, map => map.print())); + toRemove.forEach(mapping => mapping.destroy()); + } + } + + // Initializes global methods for this model. + static initialize(callback) { + IDMapping.resync(callback); + IDMapping.cleanupInterval = setInterval(IDMapping.cleanup, config.mappings.cleanupInterval); + } + + // Gets all mappings from redis to populate the local database. + // Calls `callback()` when done. + static resync(callback) { + let client = redis.createClient(); + let tasks = []; + + return client.smembers(config.redis.keys.mappings, (error, mappings) => { + if (error != null) { Logger.error("[IDMapping] error getting list of mappings from redis:", error); } + + mappings.forEach(id => { + tasks.push(done => { + client.hgetall(config.redis.keys.mapping(id), function(error, mappingData) { + if (error != null) { Logger.error("[IDMapping] error getting information for a mapping from redis:", error); } + + if (mappingData != null) { + let mapping = new IDMapping(); + mapping.fromRedis(mappingData); + mapping.save(function(error, hook) { + if (mapping.id >= nextID) { nextID = mapping.id + 1; } + done(null, mapping); + }); + } else { + done(null, null); + } + }); + }); + }); + + return async.series(tasks, function(errors, result) { + mappings = _.map(IDMapping.allSync(), m => m.print()); + Logger.info("[IDMapping] finished resync, mappings registered:", mappings); + return (typeof callback === 'function' ? callback() : undefined); + }); + }); + } +}); diff --git a/bbb-webhooks/logger.coffee b/bbb-webhooks/logger.js similarity index 61% rename from bbb-webhooks/logger.coffee rename to bbb-webhooks/logger.js index 384661124323b1ff3f27ab8035cdc21e82bb5863..f37f162bf10a55070eedbda5d4f7a6dc958f5d41 100644 --- a/bbb-webhooks/logger.coffee +++ b/bbb-webhooks/logger.js @@ -1,10 +1,11 @@ -winston = require("winston") +"use strict"; +const winston = require("winston"); -logger = new (winston.Logger)( +const logger = new (winston.Logger)({ transports: [ new (winston.transports.Console)({ timestamp: true, colorize: true }), new (winston.transports.File)({ filename: "log/application.log", timestamp: true }) ] -) +}); -module.exports = logger +module.exports = logger; diff --git a/bbb-webhooks/messageMapping.coffee b/bbb-webhooks/messageMapping.coffee deleted file mode 100644 index 62227211e9ff300276f499631e1fa3b79e714a79..0000000000000000000000000000000000000000 --- a/bbb-webhooks/messageMapping.coffee +++ /dev/null @@ -1,176 +0,0 @@ -config = require("./config") -Logger = require("./logger") -IDMapping = require("./id_mapping") -module.exports = class MessageMapping - - constructor: -> - @mappedObject = {} - @mappedMessage = {} - @meetingEvents = ["meeting_created_message","meeting_destroyed_event"] - @userEvents = ["meeting_destroyed_event","user_joined_message","user_left_message","user_listening_only","user_joined_voice_message","user_left_voice_message"] - @chatEvents = ["send_public_chat_message","send_private_chat_message"] - @rapEvents = ["archive_started","archive_ended","sanity_started","sanity_ended","post_archive_started","post_archive_ended","process_started","process_ended","post_process_started","post_process_ended","publish_started","publish_ended","post_publish_started","post_publish_ended"] - - # Map internal message based on it's type - mapMessage: (messageObj) -> - if messageObj.header?.name in @meetingEvents - @meetingTemplate(messageObj) - else if messageObj.header?.name in @userEvents - @userTemplate(messageObj) - else if messageObj.header?.name in @chatEvents - @chatTemplate(messageObj) - else if messageObj.header?.name in @rapEvents - @rapTemplate(messageObj) - - # Map internal to external message for meeting information - meetingTemplate: (messageObj) -> - @mappedObject.data = { - "type": "event", - "id": @mapInternalMessage(messageObj.header.name), - "attributes":{ - "meeting":{ - "internal-meeting-id": messageObj.payload.meeting_id, - "external-meeting-id": messageObj.payload.external_meeting_id - } - }, - "event":{ - "ts": messageObj.header.current_time - } - } - if messageObj.header.name is "meeting_created_message" - @mappedObject.data.attributes = { - "meeting":{ - "internal-meeting-id": messageObj.payload.meeting_id, - "external-meeting-id": messageObj.payload.external_meeting_id - }, - "name": messageObj.payload.name, - "is-breakout": messageObj.payload.is_breakout, - "duration": messageObj.payload.duration, - "create-time": messageObj.payload.create_time, - "create-date": messageObj.payload.create_date, - "moderator-pass": messageObj.payload.moderator_pass, - "viewer-pass": messageObj.payload.viewer_pass, - "recorded": messageObj.payload.recorded, - #"record": ?, - "voice-conf": messageObj.payload.voice_conf, - #"dial-number": ?, - "max-users": messageObj.payload.max_users, - "metadata": {} - } - @mappedMessage = JSON.stringify(@mappedObject) - Logger.info "[MessageMapping] Mapped message:", @mappedMessage - - # Map internal to external message for user information - userTemplate: (messageObj) -> - # Specific verification for listen_only event - messageObj.header.name += if messageObj.payload.listen_only then "_true" else "" - userid = if messageObj.payload.user? then messageObj.payload.user.userid else messageObj.payload.userid - extid = if messageObj.payload.user? then messageObj.payload.user.extern_userid else "" - @mappedObject.data = { - "type": "event", - "id": @mapInternalMessage(messageObj.header.name), - "attributes":{ - "meeting":{ - "internal-meeting-id": messageObj.payload.meeting_id, - "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) - }, - "user":{ - "internal-user-id": userid, - "external-user-id": extid - } - }, - "event":{ - "ts": messageObj.header.current_time - } - } - if messageObj.header.name is "user_joined_message" - @mappedObject.data.attributes.user = { - "internal-user-id": messageObj.payload.user.userid, - "external-user-id": messageObj.payload.user.extern_userid, - "name": messageObj.payload.user.name, - "role": messageObj.payload.user.role, - "presenter": messageObj.payload.user.presenter, - "sharing-mic": messageObj.payload.user.voiceUser.joined - "sharing-video": messageObj.payload.user.has_stream, - "listening-only": messageObj.payload.user.listenOnly - } - @mappedMessage = JSON.stringify(@mappedObject) - Logger.info "[MessageMapping] Mapped message:", @mappedMessage - - # Map internal to external message for chat information - chatTemplate: (messageObj) -> - @mappedObject.data = { - "type": "event", - "id": @mapInternalMessage(messageObj.header.name), - "attributes":{ - "meeting":{ - "internal-meeting-id": messageObj.payload.meeting_id, - "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) - }, - "chat-message":{ - "message": messageObj.payload.message.message, - "sender":{ - "internal-user-id": messageObj.payload.message.fromUserID, - "external-user-id": messageObj.payload.message.fromUsername, - "timezone-offset": messageObj.payload.message.fromTimeZoneOffset, - "time": messageObj.payload.message.fromTime - } - } - }, - "event":{ - "ts": messageObj.header.current_time - } - } - if messageObj.header.name.indexOf("private") != -1 - @mappedObject.data.attributes["chat-message"].receiver = { - "internal-user-id": messageObj.payload.message.toUserID, - "external-user-id": messageObj.payload.message.toUsername - } - @mappedMessage = JSON.stringify(@mappedObject) - Logger.info "[MessageMapping] Mapped message:", @mappedMessage - - rapTemplate: (messageObj) -> - @mappedObject.data = { - "type": "event", - "id": @mapInternalMessage(messageObj.header.name), - "attributes":{ - "meeting":{ - "internal-meeting-id": messageObj.payload.meeting_id, - "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) - } - }, - "event":{ - "ts": messageObj.header.current_time - } - } - @mappedMessage = JSON.stringify(@mappedObject) - Logger.info "[MessageMapping] Mapped message:", @mappedMessage - - - mapInternalMessage: (message) -> - mappedMsg = switch message - when "meeting_created_message" then "meeting-created" - when "meeting_destroyed_event" then "meeting-ended" - when "user_joined_message" then "user-joined" - when "user_left_message" then "user-left" - when "user_listening_only_true" then "user-audio-listen-only-enabled" - when "user_listening_only" then "user-audio-listen-only-disabled" - when "user_joined_voice_message" then "user-audio-voice-enabled" - when "user_left_voice_message" then "user-audio-voice-disabled" - when "send_public_chat_message" then "chat-public-message-sent" - when "send_private_chat_message" then "chat-private-message-sent" - when "archive_started" then "rap-archive-started" - when "archive_ended" then "rap-archive-ended" - when "sanity_started" then "rap-sanity-started" - when "sanity_ended" then "rap-sanity-ended" - when "post_archive_started" then "rap-post-archive-started" - when "post_archive_ended" then "rap-post-archive-ended" - when "process_started" then "rap-process-started" - when "process_ended" then "rap-process-ended" - when "post_process_started" then "rap-post-process-started" - when "post_process_ended" then "rap-post-process-ended" - when "publish_started" then "rap-publish-started" - when "publish_ended" then "rap-publish-ended" - when "post_publish_started" then "rap-post-publish-started" - when "post_publish_ended" then "rap-post-publish-ended" - mappedMsg diff --git a/bbb-webhooks/messageMapping.js b/bbb-webhooks/messageMapping.js new file mode 100644 index 0000000000000000000000000000000000000000..b8b9721469af00e6ee7cb97a65fa38c49b3c4310 --- /dev/null +++ b/bbb-webhooks/messageMapping.js @@ -0,0 +1,199 @@ +"use strict"; +let MessageMapping; +const config = require("./config.js"); +const Logger = require("./logger.js"); +const IDMapping = require("./id_mapping.js"); +module.exports = (MessageMapping = class MessageMapping { + + constructor() { + this.mappedObject = {}; + this.mappedMessage = {}; + this.meetingEvents = ["meeting_created_message","meeting_destroyed_event"]; + this.userEvents = ["meeting_destroyed_event","user_joined_message","user_left_message","user_listening_only","user_joined_voice_message","user_left_voice_message"]; + this.chatEvents = ["send_public_chat_message","send_private_chat_message"]; + this.rapEvents = ["archive_started","archive_ended","sanity_started","sanity_ended","post_archive_started","post_archive_ended","process_started","process_ended","post_process_started","post_process_ended","publish_started","publish_ended","post_publish_started","post_publish_ended"]; + } + + // Map internal message based on it's type + mapMessage(messageObj) { + if (this.meetingEvents.some( event => { + return (messageObj.header != null ? messageObj.header.name : undefined) === event + })) { + this.meetingTemplate(messageObj); + } else if (this.userEvents.some( event => { + return (messageObj.header != null ? messageObj.header.name : undefined) === event + })) { + this.userTemplate(messageObj); + } else if (this.chatEvents.some( event => { + return (messageObj.header != null ? messageObj.header.name : undefined) === event + })) { + this.chatTemplate(messageObj); + } else if (this.rapEvents.some( event => { + return (messageObj.header != null ? messageObj.header.name : undefined) === event + })) { + this.rapTemplate(messageObj); + } + } + + // Map internal to external message for meeting information + meetingTemplate(messageObj) { + this.mappedObject.data = { + "type": "event", + "id": this.mapInternalMessage(messageObj.header.name), + "attributes":{ + "meeting":{ + "internal-meeting-id": messageObj.payload.meeting_id, + "external-meeting-id": messageObj.payload.external_meeting_id + } + }, + "event":{ + "ts": messageObj.header.current_time + } + }; + if (messageObj.header.name === "meeting_created_message") { + this.mappedObject.data.attributes = { + "meeting":{ + "internal-meeting-id": messageObj.payload.meeting_id, + "external-meeting-id": messageObj.payload.external_meeting_id + }, + "name": messageObj.payload.name, + "is-breakout": messageObj.payload.is_breakout, + "duration": messageObj.payload.duration, + "create-time": messageObj.payload.create_time, + "create-date": messageObj.payload.create_date, + "moderator-pass": messageObj.payload.moderator_pass, + "viewer-pass": messageObj.payload.viewer_pass, + "recorded": messageObj.payload.recorded, + //"record": ?, + "voice-conf": messageObj.payload.voice_conf, + //"dial-number": ?, + "max-users": messageObj.payload.max_users, + "metadata": {} + }; + } + this.mappedMessage = JSON.stringify(this.mappedObject); + Logger.info("[MessageMapping] Mapped message:", this.mappedMessage); + } + + // Map internal to external message for user information + userTemplate(messageObj) { + // Specific verification for listen_only event + messageObj.header.name += messageObj.payload.listen_only ? "_true" : ""; + const userid = (messageObj.payload.user != null) ? messageObj.payload.user.userid : messageObj.payload.userid; + const extid = (messageObj.payload.user != null) ? messageObj.payload.user.extern_userid : ""; + this.mappedObject.data = { + "type": "event", + "id": this.mapInternalMessage(messageObj.header.name), + "attributes":{ + "meeting":{ + "internal-meeting-id": messageObj.payload.meeting_id, + "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) + }, + "user":{ + "internal-user-id": userid, + "external-user-id": extid + } + }, + "event":{ + "ts": messageObj.header.current_time + } + }; + if (messageObj.header.name === "user_joined_message") { + this.mappedObject.data.attributes.user = { + "internal-user-id": messageObj.payload.user.userid, + "external-user-id": messageObj.payload.user.extern_userid, + "name": messageObj.payload.user.name, + "role": messageObj.payload.user.role, + "presenter": messageObj.payload.user.presenter, + "sharing-mic": messageObj.payload.user.voiceUser.joined, + "sharing-video": messageObj.payload.user.has_stream, + "listening-only": messageObj.payload.user.listenOnly + }; + } + this.mappedMessage = JSON.stringify(this.mappedObject); + Logger.info("[MessageMapping] Mapped message:", this.mappedMessage); + } + + // Map internal to external message for chat information + chatTemplate(messageObj) { + this.mappedObject.data = { + "type": "event", + "id": this.mapInternalMessage(messageObj.header.name), + "attributes":{ + "meeting":{ + "internal-meeting-id": messageObj.payload.meeting_id, + "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) + }, + "chat-message":{ + "message": messageObj.payload.message.message, + "sender":{ + "internal-user-id": messageObj.payload.message.fromUserID, + "external-user-id": messageObj.payload.message.fromUsername, + "timezone-offset": messageObj.payload.message.fromTimeZoneOffset, + "time": messageObj.payload.message.fromTime + } + } + }, + "event":{ + "ts": messageObj.header.current_time + } + }; + if (messageObj.header.name.indexOf("private") !== -1) { + this.mappedObject.data.attributes["chat-message"].receiver = { + "internal-user-id": messageObj.payload.message.toUserID, + "external-user-id": messageObj.payload.message.toUsername + }; + } + this.mappedMessage = JSON.stringify(this.mappedObject); + Logger.info("[MessageMapping] Mapped message:", this.mappedMessage); + } + + rapTemplate(messageObj) { + this.mappedObject.data = { + "type": "event", + "id": this.mapInternalMessage(messageObj.header.name), + "attributes":{ + "meeting":{ + "internal-meeting-id": messageObj.payload.meeting_id, + "external-meeting-id": IDMapping.getExternalMeetingID(messageObj.payload.meeting_id) + } + }, + "event":{ + "ts": messageObj.header.current_time + } + }; + this.mappedMessage = JSON.stringify(this.mappedObject); + Logger.info("[MessageMapping] Mapped message:", this.mappedMessage); + } + + + mapInternalMessage(message) { + const mappedMsg = (() => { switch (message) { + case "meeting_created_message": return "meeting-created"; + case "meeting_destroyed_event": return "meeting-ended"; + case "user_joined_message": return "user-joined"; + case "user_left_message": return "user-left"; + case "user_listening_only_true": return "user-audio-listen-only-enabled"; + case "user_listening_only": return "user-audio-listen-only-disabled"; + case "user_joined_voice_message": return "user-audio-voice-enabled"; + case "user_left_voice_message": return "user-audio-voice-disabled"; + case "send_public_chat_message": return "chat-public-message-sent"; + case "send_private_chat_message": return "chat-private-message-sent"; + case "archive_started": return "rap-archive-started"; + case "archive_ended": return "rap-archive-ended"; + case "sanity_started": return "rap-sanity-started"; + case "sanity_ended": return "rap-sanity-ended"; + case "post_archive_started": return "rap-post-archive-started"; + case "post_archive_ended": return "rap-post-archive-ended"; + case "process_started": return "rap-process-started"; + case "process_ended": return "rap-process-ended"; + case "post_process_started": return "rap-post-process-started"; + case "post_process_ended": return "rap-post-process-ended"; + case "publish_started": return "rap-publish-started"; + case "publish_ended": return "rap-publish-ended"; + case "post_publish_started": return "rap-post-publish-started"; + case "post_publish_ended": return "rap-post-publish-ended"; + } })(); + return mappedMsg; + } +}); diff --git a/bbb-webhooks/utils.coffee b/bbb-webhooks/utils.coffee deleted file mode 100644 index f0160b6761351c439c9598a8ebcb8558091e00c1..0000000000000000000000000000000000000000 --- a/bbb-webhooks/utils.coffee +++ /dev/null @@ -1,62 +0,0 @@ -sha1 = require("sha1") -url = require("url") - -config = require("./config") - -Utils = exports - -# Calculates the checksum given a url `fullUrl` and a `salt`, as calculate by bbb-web. -Utils.checksumAPI = (fullUrl, salt) -> - query = Utils.queryFromUrl(fullUrl) - method = Utils.methodFromUrl(fullUrl) - Utils.checksum(method + query + salt) - -# Calculates the checksum for a string. -# Just a wrapper for the method that actually does it. -Utils.checksum = (string) -> - sha1(string) - -# Get the query of an API call from the url object (from url.parse()) -# Example: -# -# * `fullUrl` = `http://bigbluebutton.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo` -# * returns: `name=Demo+Meeting&meetingID=Demo` -Utils.queryFromUrl = (fullUrl) -> - - # Returns the query without the checksum. - # We can't use url.parse() because it would change the encoding - # and the checksum wouldn't match. We need the url exactly as - # the client sent us. - query = fullUrl.replace(/&checksum=[^&]*/, '') - query = query.replace(/checksum=[^&]*&/, '') - query = query.replace(/checksum=[^&]*$/, '') - matched = query.match(/\?(.*)/) - if matched? - matched[1] - else - '' - -# Get the method name of an API call from the url object (from url.parse()) -# Example: -# -# * `fullUrl` = `http://mconf.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo` -# * returns: `create` -Utils.methodFromUrl = (fullUrl) -> - urlObj = url.parse(fullUrl, true) - urlObj.pathname.substr (config.bbb.apiPath + "/").length - -# Returns the IP address of the client that made a request `req`. -# If can not determine the IP, returns `127.0.0.1`. -Utils.ipFromRequest = (req) -> - - # the first ip in the list if the ip of the client - # the others are proxys between him and us - if req.headers?["x-forwarded-for"]? - ips = req.headers["x-forwarded-for"].split(",") - ipAddress = ips[0]?.trim() - - # fallbacks - ipAddress ||= req.headers?["x-real-ip"] # when behind nginx - ipAddress ||= req.connection?.remoteAddress - ipAddress ||= "127.0.0.1" - ipAddress diff --git a/bbb-webhooks/utils.js b/bbb-webhooks/utils.js new file mode 100644 index 0000000000000000000000000000000000000000..a7e0b4a7d7b339f900a33892d4520bcdc39c0f55 --- /dev/null +++ b/bbb-webhooks/utils.js @@ -0,0 +1,69 @@ +"use strict"; +const sha1 = require("sha1"); +const url = require("url"); + +const config = require("./config"); + +const Utils = exports; + +// Calculates the checksum given a url `fullUrl` and a `salt`, as calculate by bbb-web. +Utils.checksumAPI = function(fullUrl, salt) { + const query = Utils.queryFromUrl(fullUrl); + const method = Utils.methodFromUrl(fullUrl); + return Utils.checksum(method + query + salt); +}; + +// Calculates the checksum for a string. +// Just a wrapper for the method that actually does it. +Utils.checksum = string => sha1(string); + +// Get the query of an API call from the url object (from url.parse()) +// Example: +// +// * `fullUrl` = `http://bigbluebutton.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo` +// * returns: `name=Demo+Meeting&meetingID=Demo` +Utils.queryFromUrl = function(fullUrl) { + + // Returns the query without the checksum. + // We can't use url.parse() because it would change the encoding + // and the checksum wouldn't match. We need the url exactly as + // the client sent us. + let query = fullUrl.replace(/&checksum=[^&]*/, ''); + query = query.replace(/checksum=[^&]*&/, ''); + query = query.replace(/checksum=[^&]*$/, ''); + const matched = query.match(/\?(.*)/); + if (matched != null) { + return matched[1]; + } else { + return ''; + } +}; + +// Get the method name of an API call from the url object (from url.parse()) +// Example: +// +// * `fullUrl` = `http://mconf.org/bigbluebutton/api/create?name=Demo+Meeting&meetingID=Demo` +// * returns: `create` +Utils.methodFromUrl = function(fullUrl) { + const urlObj = url.parse(fullUrl, true); + return urlObj.pathname.substr((config.bbb.apiPath + "/").length); +}; + +// Returns the IP address of the client that made a request `req`. +// If can not determine the IP, returns `127.0.0.1`. +Utils.ipFromRequest = function(req) { + + // the first ip in the list if the ip of the client + // the others are proxys between him and us + let ipAddress; + if ((req.headers != null ? req.headers["x-forwarded-for"] : undefined) != null) { + let ips = req.headers["x-forwarded-for"].split(","); + ipAddress = ips[0] != null ? ips[0].trim() : undefined; + } + + // fallbacks + if (!ipAddress) { ipAddress = req.headers != null ? req.headers["x-real-ip"] : undefined; } // when behind nginx + if (!ipAddress) { ipAddress = req.connection != null ? req.connection.remoteAddress : undefined; } + if (!ipAddress) { ipAddress = "127.0.0.1"; } + return ipAddress; +}; diff --git a/bbb-webhooks/web_hooks.coffee b/bbb-webhooks/web_hooks.coffee deleted file mode 100644 index ea70743efa0b9e6c22d8abb9332660de30ae00f5..0000000000000000000000000000000000000000 --- a/bbb-webhooks/web_hooks.coffee +++ /dev/null @@ -1,105 +0,0 @@ -_ = require("lodash") -async = require("async") -redis = require("redis") -request = require("request") - -config = require("./config") -Hook = require("./hook") -IDMapping = require("./id_mapping") -Logger = require("./logger") -MessageMapping = require("./messageMapping") - -# Web hooks will listen for events on redis coming from BigBlueButton and -# perform HTTP calls with them to all registered hooks. -module.exports = class WebHooks - - constructor: -> - @subscriberEvents = redis.createClient() - - start: -> - @_subscribeToEvents() - - # Subscribe to the events on pubsub that might need to be sent in callback calls. - _subscribeToEvents: -> - @subscriberEvents.on "psubscribe", (channel, count) -> - Logger.info "[WebHooks] subscribed to:" + channel - - @subscriberEvents.on "pmessage", (pattern, channel, message) => - - processMessage = => - Logger.info "[WebHooks] processing message on [#{channel}]:", JSON.stringify(message) - @_processEvent(message, raw) - - try - raw = JSON.parse(message) - messageMapped = new MessageMapping() - messageMapped.mapMessage(JSON.parse(message)) - message = messageMapped.mappedObject - if not _.isEmpty(message) - id = message.data.attributes.meeting["internal-meeting-id"] - IDMapping.reportActivity(id) - - # First treat meeting events to add/remove ID mappings - if message.data?.id is "meeting-created" - Logger.info "[WebHooks] got create message on meetings channel [#{channel}]:", message - IDMapping.addOrUpdateMapping message.data.attributes.meeting["internal-meeting-id"], message.data.attributes.meeting["external-meeting-id"], (error, result) -> - # has to be here, after the meeting was created, otherwise create calls won't generate - # callback calls for meeting hooks - processMessage() - - # TODO: Temporarily commented because we still need the mapping for recording events, - # after the meeting ended. - # else if message.header?.name is "meeting_destroyed_event" - # Logger.info "[WebHooks] got destroy message on meetings channel [#{channel}]", message - # IDMapping.removeMapping message.payload?.meeting_id, (error, result) -> - # processMessage() - - else - processMessage() - else - @_processRaw(raw) - catch e - Logger.error "[WebHooks] error processing the message:", JSON.stringify(raw), ":", e - - @subscriberEvents.psubscribe config.hooks.pchannel - - # Send raw data to hooks that are not expecting mapped messages - _processRaw: (message) -> - hooks = Hook.allGlobalSync() - - # Add hooks for the specific meeting that expect raw data - if message[config.webhooks.rawPath] is not null - # Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis (configurable) - switch config.webhooks.rawPath - when "payload" then idFromMessage = message[config.webhooks.rawPath][config.webhooks.meetingID] - when "data" then idFromMessage = message[config.webhooks.rawPath].attributes.meeting[config.webhooks.meetingID] - if idFromMessage? - eMeetingID = IDMapping.getExternalMeetingID(idFromMessage) - hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID)) - # Notify the hooks that expect raw data - async.forEach hooks, (hook) -> - Logger.info "[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL if hook.getRaw - hook.enqueue message if hook.getRaw - - # Processes an event received from redis. Will get all hook URLs that - # should receive this event and start the process to perform the callback. - _processEvent: (message, raw) -> - # Get all global hooks - hooks = Hook.allGlobalSync() - - # filter the hooks that need to receive this event - # add hooks that are registered for this specific meeting - idFromMessage = message.data?.attributes.meeting["internal-meeting-id"] - if idFromMessage? - eMeetingID = IDMapping.getExternalMeetingID(idFromMessage) - hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID)) - - # Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message - async.forEach hooks, (hook) -> - Logger.info "[WebHooks] enqueueing a message in the hook:", hook.callbackURL if not hook.getRaw - hook.enqueue message if not hook.getRaw - - sendRaw = hooks.some (hook) -> - return hook.getRaw - if sendRaw - @_processRaw(raw) diff --git a/bbb-webhooks/web_hooks.js b/bbb-webhooks/web_hooks.js new file mode 100644 index 0000000000000000000000000000000000000000..7f47f8445ce72697728139c675e8c109bdda20a3 --- /dev/null +++ b/bbb-webhooks/web_hooks.js @@ -0,0 +1,125 @@ +"use strict"; +let WebHooks; +const _ = require("lodash"); +const async = require("async"); +const redis = require("redis"); +const request = require("request"); +const config = require("./config.js"); +const Hook = require("./hook.js"); +const IDMapping = require("./id_mapping.js"); +const Logger = require("./logger.js"); +const MessageMapping = require("./messageMapping.js"); + +// Web hooks will listen for events on redis coming from BigBlueButton and +// perform HTTP calls with them to all registered hooks. +module.exports = (WebHooks = class WebHooks { + + constructor() { + this.subscriberEvents = redis.createClient(); + } + + start() { + this._subscribeToEvents(); + } + + // Subscribe to the events on pubsub that might need to be sent in callback calls. + _subscribeToEvents() { + this.subscriberEvents.on("psubscribe", (channel, count) => Logger.info(`[WebHooks] subscribed to:${channel}`)); + + this.subscriberEvents.on("pmessage", (pattern, channel, message) => { + + let raw; + const processMessage = () => { + Logger.info(`[WebHooks] processing message on [${channel}]:`, JSON.stringify(message)); + this._processEvent(message, raw); + }; + + try { + raw = JSON.parse(message); + let messageMapped = new MessageMapping(); + messageMapped.mapMessage(JSON.parse(message)); + message = messageMapped.mappedObject; + if (!_.isEmpty(message) && !config.hooks.getRaw) { + const id = message.data.attributes.meeting["internal-meeting-id"]; + IDMapping.reportActivity(id); + + // First treat meeting events to add/remove ID mappings + if ((message.data != null ? message.data.id : undefined) === "meeting-created") { + Logger.info(`[WebHooks] got create message on meetings channel [${channel}]:`, message); + IDMapping.addOrUpdateMapping(message.data.attributes.meeting["internal-meeting-id"], message.data.attributes.meeting["external-meeting-id"], (error, result) => + // has to be here, after the meeting was created, otherwise create calls won't generate + // callback calls for meeting hooks + processMessage() + ); + + // TODO: Temporarily commented because we still need the mapping for recording events, + // after the meeting ended. + // else if message.header?.name is "meeting_destroyed_event" + // Logger.info "[WebHooks] got destroy message on meetings channel [#{channel}]", message + // IDMapping.removeMapping message.payload?.meeting_id, (error, result) -> + // processMessage() + + } else { + processMessage(); + } + } else { + this._processRaw(raw); + } + } catch (e) { + Logger.error("[WebHooks] error processing the message:", JSON.stringify(raw), ":", e); + } + }); + + this.subscriberEvents.psubscribe(config.hooks.pchannel); + } + + // Send raw data to hooks that are not expecting mapped messages + _processRaw(message) { + let idFromMessage; + let hooks = Hook.allGlobalSync(); + + // Add hooks for the specific meeting that expect raw data + if (message[config.webhooks.rawPath] != null) { + // Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis (configurable) + switch (config.webhooks.rawPath) { + case "payload": idFromMessage = message[config.webhooks.rawPath][config.webhooks.meetingID]; break; + case "data": idFromMessage = message[config.webhooks.rawPath].attributes.meeting[config.webhooks.meetingID]; break; + } + } + if (idFromMessage != null) { + const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage); + hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID)); + } + // Notify the hooks that expect raw data + async.forEach(hooks, function(hook) { + if (hook.getRaw) { Logger.info("[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL); } + if (hook.getRaw) { hook.enqueue(message); } + }); + } + + // Processes an event received from redis. Will get all hook URLs that + // should receive this event and start the process to perform the callback. + _processEvent(message, raw) { + // Get all global hooks + let hooks = Hook.allGlobalSync(); + + // filter the hooks that need to receive this event + // add hooks that are registered for this specific meeting + const idFromMessage = message.data != null ? message.data.attributes.meeting["internal-meeting-id"] : undefined; + if (idFromMessage != null) { + const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage); + hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID)); + } + + // Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message + async.forEach(hooks, function(hook) { + if (!hook.getRaw) { Logger.info("[WebHooks] enqueueing a message in the hook:", hook.callbackURL); } + if (!hook.getRaw) { hook.enqueue(message); } + }); + + const sendRaw = hooks.some(hook => { return hook.getRaw }); + if (sendRaw) { + this._processRaw(raw); + } + } +}); diff --git a/bbb-webhooks/web_server.coffee b/bbb-webhooks/web_server.coffee deleted file mode 100644 index 26a3c170a2443f67f8eace791582081cb454a570..0000000000000000000000000000000000000000 --- a/bbb-webhooks/web_server.coffee +++ /dev/null @@ -1,138 +0,0 @@ -_ = require("lodash") -express = require("express") -url = require("url") - -config = require("./config") -Hook = require("./hook") -Logger = require("./logger") -Utils = require("./utils") - -# Web server that listens for API calls and process them. -module.exports = class WebServer - - constructor: -> - @app = express() - @_registerRoutes() - - start: (port) -> - @server = @app.listen(port) - unless @server.address()? - Logger.error "[WebServer] aborting, could not bind to port", port, - process.exit(1) - Logger.info "[WebServer] listening on port", port, "in", @app.settings.env.toUpperCase(), "mode" - - _registerRoutes: -> - # Request logger - @app.all "*", (req, res, next) -> - unless fromMonit(req) - Logger.info "[WebServer]", req.method, "request to", req.url, "from:", clientDataSimple(req) - next() - - @app.get "/bigbluebutton/api/hooks/create", @_validateChecksum, @_create - @app.get "/bigbluebutton/api/hooks/destroy", @_validateChecksum, @_destroy - @app.get "/bigbluebutton/api/hooks/list", @_validateChecksum, @_list - @app.get "/bigbluebutton/api/hooks/ping", (req, res) -> - res.write "bbb-webhooks up!" - res.end() - - _create: (req, res, next) -> - urlObj = url.parse(req.url, true) - callbackURL = urlObj.query["callbackURL"] - meetingID = urlObj.query["meetingID"] - getRaw = JSON.parse(urlObj.query["getRaw"].toLowerCase()) - - unless callbackURL? - respondWithXML(res, config.api.responses.missingParamCallbackURL) - else - Hook.addSubscription callbackURL, meetingID, getRaw, (error, hook) -> - if error? # the only error for now is for duplicated callbackURL - msg = config.api.responses.createDuplicated(hook.id) - else if hook? - msg = config.api.responses.createSuccess(hook.id, hook.permanent, hook.getRaw) - else - msg = config.api.responses.createFailure - respondWithXML(res, msg) - # Create a permanent hook. Permanent hooks can't be deleted via API and will try to emit a message until it succeed - createPermanent: -> - Hook.addSubscription config.hooks.aggr, null, config.hooks.getRaw, (error, hook) -> - if error? # there probably won't be any errors here - Logger.info "[WebServer] duplicated permanent hook", error - else if hook? - Logger.info "[WebServer] permanent hook created successfully" - else - Logger.info "[WebServer] error creating permanent hook" - - _destroy: (req, res, next) -> - urlObj = url.parse(req.url, true) - hookID = urlObj.query["hookID"] - - unless hookID? - respondWithXML(res, config.api.responses.missingParamHookID) - else - Hook.removeSubscription hookID, (error, result) -> - if error? - msg = config.api.responses.destroyFailure - else if !result - msg = config.api.responses.destroyNoHook - else - msg = config.api.responses.destroySuccess - respondWithXML(res, msg) - - _list: (req, res, next) -> - urlObj = url.parse(req.url, true) - meetingID = urlObj.query["meetingID"] - - if meetingID? - # all the hooks that receive events from this meeting - hooks = Hook.allGlobalSync() - hooks = hooks.concat(Hook.findByExternalMeetingIDSync(meetingID)) - hooks = _.sortBy(hooks, (hook) -> hook.id) - else - # no meetingID, return all hooks - hooks = Hook.allSync() - - msg = "<response><returncode>SUCCESS</returncode><hooks>" - hooks.forEach (hook) -> - msg += "<hook>" - msg += "<hookID>#{hook.id}</hookID>" - msg += "<callbackURL><![CDATA[#{hook.callbackURL}]]></callbackURL>" - msg += "<meetingID><![CDATA[#{hook.externalMeetingID}]]></meetingID>" unless hook.isGlobal() - msg += "<permanentHook>#{hook.permanent}</permanentHook>" - msg += "<rawData>#{hook.getRaw}</rawData>" - msg += "</hook>" - msg += "</hooks></response>" - - respondWithXML(res, msg) - - # Validates the checksum in the request `req`. - # If it doesn't match BigBlueButton's shared secret, will send an XML response - # with an error code just like BBB does. - _validateChecksum: (req, res, next) => - urlObj = url.parse(req.url, true) - checksum = urlObj.query["checksum"] - - if checksum is Utils.checksumAPI(req.url, config.bbb.sharedSecret) - next() - else - Logger.info "[WebServer] checksum check failed, sending a checksumError response" - res.setHeader("Content-Type", "text/xml") - res.send cleanupXML(config.api.responses.checksumError) - -respondWithXML = (res, msg) -> - msg = cleanupXML(msg) - Logger.info "[WebServer] respond with:", msg - res.setHeader("Content-Type", "text/xml") - res.send msg - -# Returns a simple string with a description of the client that made -# the request. It includes the IP address and the user agent. -clientDataSimple = (req) -> - "ip " + Utils.ipFromRequest(req) + ", using " + req.headers["user-agent"] - -# Cleans up a string with an XML in it removing spaces and new lines from between the tags. -cleanupXML = (string) -> - string.trim().replace(/>\s*/g, '>') - -# Was this request made by monit? -fromMonit = (req) -> - req.headers["user-agent"]? and req.headers["user-agent"].match(/^monit/) diff --git a/bbb-webhooks/web_server.js b/bbb-webhooks/web_server.js new file mode 100644 index 0000000000000000000000000000000000000000..b99c5b5e053392bf8767552d5248b640bf5518bf --- /dev/null +++ b/bbb-webhooks/web_server.js @@ -0,0 +1,166 @@ +"use strict"; +let WebServer; +const _ = require("lodash"); +const express = require("express"); +const url = require("url"); + +const config = require("./config.js"); +const Hook = require("./hook.js"); +const Logger = require("./logger.js"); +const Utils = require("./utils.js"); + +// Web server that listens for API calls and process them. +module.exports = (WebServer = class WebServer { + + constructor() { + this._validateChecksum = this._validateChecksum.bind(this); + this.app = express(); + this._registerRoutes(); + } + + start(port) { + this.server = this.app.listen(port); + if (this.server.address() == null) { + Logger.error("[WebServer] aborting, could not bind to port", port, + process.exit(1)); + } + Logger.info("[WebServer] listening on port", port, "in", this.app.settings.env.toUpperCase(), "mode"); + } + + _registerRoutes() { + // Request logger + this.app.all("*", function(req, res, next) { + if (!fromMonit(req)) { + Logger.info("[WebServer]", req.method, "request to", req.url, "from:", clientDataSimple(req)); + } + next(); + }); + + this.app.get("/bigbluebutton/api/hooks/create", this._validateChecksum, this._create); + this.app.get("/bigbluebutton/api/hooks/destroy", this._validateChecksum, this._destroy); + this.app.get("/bigbluebutton/api/hooks/list", this._validateChecksum, this._list); + this.app.get("/bigbluebutton/api/hooks/ping", function(req, res) { + res.write("bbb-webhooks up!"); + res.end(); + }); + } + + _create(req, res, next) { + const urlObj = url.parse(req.url, true); + const callbackURL = urlObj.query["callbackURL"]; + const meetingID = urlObj.query["meetingID"]; + const getRaw = JSON.parse(urlObj.query["getRaw"].toLowerCase()); + + if (callbackURL == null) { + respondWithXML(res, config.api.responses.missingParamCallbackURL); + } else { + Hook.addSubscription(callbackURL, meetingID, getRaw, function(error, hook) { + let msg; + if (error != null) { // the only error for now is for duplicated callbackURL + msg = config.api.responses.createDuplicated(hook.id); + } else if (hook != null) { + msg = config.api.responses.createSuccess(hook.id, hook.permanent, hook.getRaw); + } else { + msg = config.api.responses.createFailure; + } + respondWithXML(res, msg); + }); + } + } + // Create a permanent hook. Permanent hooks can't be deleted via API and will try to emit a message until it succeed + createPermanent() { + Hook.addSubscription(config.hooks.aggr, null, config.hooks.getRaw, function(error, hook) { + if (error != null) { // there probably won't be any errors here + Logger.info("[WebServer] duplicated permanent hook", error); + } else if (hook != null) { + Logger.info("[WebServer] permanent hook created successfully"); + } else { + Logger.info("[WebServer] error creating permanent hook"); + } + }); + } + + _destroy(req, res, next) { + const urlObj = url.parse(req.url, true); + const hookID = urlObj.query["hookID"]; + + if (hookID == null) { + respondWithXML(res, config.api.responses.missingParamHookID); + } else { + Hook.removeSubscription(hookID, function(error, result) { + let msg; + if (error != null) { + msg = config.api.responses.destroyFailure; + } else if (!result) { + msg = config.api.responses.destroyNoHook; + } else { + msg = config.api.responses.destroySuccess; + } + respondWithXML(res, msg); + }); + } + } + + _list(req, res, next) { + let hooks; + const urlObj = url.parse(req.url, true); + const meetingID = urlObj.query["meetingID"]; + + if (meetingID != null) { + // all the hooks that receive events from this meeting + hooks = Hook.allGlobalSync(); + hooks = hooks.concat(Hook.findByExternalMeetingIDSync(meetingID)); + hooks = _.sortBy(hooks, hook => hook.id); + } else { + // no meetingID, return all hooks + hooks = Hook.allSync(); + } + + let msg = "<response><returncode>SUCCESS</returncode><hooks>"; + hooks.forEach(function(hook) { + msg += "<hook>"; + msg += `<hookID>${hook.id}</hookID>`; + msg += `<callbackURL><![CDATA[${hook.callbackURL}]]></callbackURL>`; + if (!hook.isGlobal()) { msg += `<meetingID><![CDATA[${hook.externalMeetingID}]]></meetingID>`; } + msg += `<permanentHook>${hook.permanent}</permanentHook>`; + msg += `<rawData>${hook.getRaw}</rawData>`; + msg += "</hook>"; + }); + msg += "</hooks></response>"; + + respondWithXML(res, msg); + } + + // Validates the checksum in the request `req`. + // If it doesn't match BigBlueButton's shared secret, will send an XML response + // with an error code just like BBB does. + _validateChecksum(req, res, next) { + const urlObj = url.parse(req.url, true); + const checksum = urlObj.query["checksum"]; + + if (checksum === Utils.checksumAPI(req.url, config.bbb.sharedSecret)) { + next(); + } else { + Logger.info("[WebServer] checksum check failed, sending a checksumError response"); + res.setHeader("Content-Type", "text/xml"); + res.send(cleanupXML(config.api.responses.checksumError)); + } + } +}); + +var respondWithXML = function(res, msg) { + msg = cleanupXML(msg); + Logger.info("[WebServer] respond with:", msg); + res.setHeader("Content-Type", "text/xml"); + res.send(msg); +}; + +// Returns a simple string with a description of the client that made +// the request. It includes the IP address and the user agent. +var clientDataSimple = req => `ip ${Utils.ipFromRequest(req)}, using ${req.headers["user-agent"]}`; + +// Cleans up a string with an XML in it removing spaces and new lines from between the tags. +var cleanupXML = string => string.trim().replace(/>\s*/g, '>'); + +// Was this request made by monit? +var fromMonit = req => (req.headers["user-agent"] != null) && req.headers["user-agent"].match(/^monit/);