Skip to content
Snippets Groups Projects
web_hooks.js 5.72 KiB
const _ = require("lodash");
const async = require("async");
const redis = require("redis");
const request = require("request");
const config = require("config");
const Hook = require("./hook.js");
const IDMapping = require("./id_mapping.js");
const Logger = require("./logger.js");
const MessageMapping = require("./messageMapping.js");
const UserMapping = require("./userMapping.js");

// Web hooks will listen for events on redis coming from BigBlueButton and
// perform HTTP calls with them to all registered hooks.
module.exports = class WebHooks {

  constructor() {
    this.subscriberEvents = Application.redisPubSubClient();
  }

  start(callback) {
    this._subscribeToEvents();
    typeof callback === 'function' ? callback(null,"w") : undefined;
  }

  // Subscribe to the events on pubsub that might need to be sent in callback calls.
  _subscribeToEvents() {
    this.subscriberEvents.on("psubscribe", (channel, count) => Logger.info(`[WebHooks] subscribed to:${channel}`));

    this.subscriberEvents.on("pmessage", (pattern, channel, message) => {

      let raw;
      const processMessage = () => {
        Logger.info(`[WebHooks] processing message on [${channel}]:`, JSON.stringify(message));
        this._processEvent(message, raw);
      };

      try {
        raw = JSON.parse(message);
        let messageMapped = new MessageMapping();
        messageMapped.mapMessage(JSON.parse(message));
        message = messageMapped.mappedObject;
        if (!_.isEmpty(message)) {
          const intId = message.data.attributes.meeting["internal-meeting-id"];
          IDMapping.reportActivity(intId);

          // First treat meeting events to add/remove ID mappings
          switch (message.data.id) {
            case "meeting-created":
              Logger.info(`[WebHooks] got create message on meetings channel [${channel}]:`, message);
              IDMapping.addOrUpdateMapping(intId, message.data.attributes.meeting["external-meeting-id"], (error, result) => {
              // has to be here, after the meeting was created, otherwise create calls won't generate
              // callback calls for meeting hooks
                processMessage();
              });
              break;
            case "user-joined":
              UserMapping.addOrUpdateMapping(message.data.attributes.user["internal-user-id"],message.data.attributes.user["external-user-id"], intId, message.data.attributes.user, () => {
                processMessage();
              });
              break;
            case "user-left":
              UserMapping.removeMapping(message.data.attributes.user["internal-user-id"], () => { processMessage(); });
              break;
            case "meeting-ended":
              UserMapping.removeMappingMeetingId(intId, () => { processMessage(); });
              break;
            default:
              processMessage();
          }
        }
      } catch (e) {
        Logger.error("[WebHooks] error processing the message:", JSON.stringify(raw), ":", e.message);
      }
    });

    config.get("hooks.channels").forEach((channel) => {
      this.subscriberEvents.psubscribe(channel);
    });

  }

  // Send raw data to hooks that are not expecting mapped messages
  _processRaw(message) {
    let idFromMessage;
    let hooks = Hook.allGlobalSync();

    // Add hooks for the specific meeting that expect raw data
    // Get meetingId for a raw message that was previously mapped by another webhook application or if it's straight from redis
    idFromMessage = this._findMeetingID(message);
    if (idFromMessage != null) {
      const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
      hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
      // Notify the hooks that expect raw data
      async.forEach(hooks, (hook) => {
        if (hook.getRaw) {
          Logger.info("[WebHooks] enqueueing a raw message in the hook:", hook.callbackURL);
          hook.enqueue(message);
        }
      });
    } // Put foreach inside the if to avoid pingpong events
  }

  _findMeetingID(message) {
    if (message.data) {
      return message.data.attributes.meeting["internal-meeting-id"];
    }
    if (message.payload) {
      return message.payload.meeting_id;
    }
    if (message.envelope && message.envelope.routing && message.envelope.routing.meetingId) {
      return message.envelope.routing.meetingId;
    }
    if (message.header && message.header.body && message.header.body.meetingId) {
      return message.header.body.meetingId;
    }
    if (message.core && message.core.body) {
      return message.core.body.props ? message.core.body.props.meetingProp.intId : message.core.body.meetingId;
    }
    return undefined;
  }

  // Processes an event received from redis. Will get all hook URLs that
  // should receive this event and start the process to perform the callback.
  _processEvent(message, raw) {
    // Get all global hooks
    let hooks = Hook.allGlobalSync();

    // filter the hooks that need to receive this event
    // add hooks that are registered for this specific meeting
    const idFromMessage = message.data != null ? message.data.attributes.meeting["internal-meeting-id"] : undefined;
    if (idFromMessage != null) {
      const eMeetingID = IDMapping.getExternalMeetingID(idFromMessage);
      hooks = hooks.concat(Hook.findByExternalMeetingIDSync(eMeetingID));
    }

    // Notify every hook asynchronously, if hook N fails, it won't block hook N+k from receiving its message
    async.forEach(hooks, (hook) => {
      if (!hook.getRaw) {
        Logger.info("[WebHooks] enqueueing a message in the hook:", hook.callbackURL);
        hook.enqueue(message);
      }
    });

    const sendRaw = hooks.some(hook => { return hook.getRaw });
    if (sendRaw && config.get("hooks.getRaw")) {
      this._processRaw(raw);
    }
  }
};