Skip to content
Snippets Groups Projects
Commit fa6a1be5 authored by Richard Alam's avatar Richard Alam
Browse files

- received redis pubsub messsages

parent 6f10140e
No related branches found
No related tags found
No related merge requests found
package org.bigbluebutton.app.video;
import java.util.HashMap;
import java.util.Map;
public class Meeting {
public final String id;
private Map<String, VideoStream> videoStreams = new HashMap<String, VideoStream>();
public Meeting(String id) {
this.id = id;
}
public synchronized void addStream(VideoStream stream) {
videoStreams.put(stream.getStreamId(), stream);
}
public synchronized void removeStream(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
}
public synchronized void streamBroadcastClose(String streamId) {
VideoStream vs = videoStreams.remove(streamId);
if (vs != null) {
vs.streamBroadcastClose();
}
}
public synchronized boolean hasVideoStreams() {
return !videoStreams.isEmpty();
}
public synchronized void stopStartRecording(String streamId) {
VideoStream vs = videoStreams.get(streamId);
if (vs != null) vs.stopStartRecording();
}
public synchronized void stopStartAllRecordings() {
for (VideoStream vs : videoStreams.values()) {
stopStartRecording(vs.getStreamId());
}
}
}
......@@ -13,17 +13,47 @@ public class MeetingManager {
this.app = app;
}
public void add(String id) {
Meeting m = new Meeting(id);
private void add(Meeting m) {
meetings.put(m.id, m);
}
public void remove(String id) {
private void remove(String id) {
Meeting m = meetings.remove(id);
}
public void addStream(String meetingId, VideoStream vs) {
Meeting m = meetings.get(meetingId);
if (m != null) {
// Close all streams;
m.addStream(vs);
} else {
Meeting nm = new Meeting(meetingId);
nm.addStream(vs);
add(m);
}
}
public void removeStream(String meetingId, String streamId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.removeStream(streamId);
}
}
public void streamBroadcastClose(String meetingId, String streamId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.streamBroadcastClose(streamId);
if (!m.hasVideoStreams()) {
remove(m.id);
}
}
}
public synchronized void stopStartAllRecordings(String meetingId) {
Meeting m = meetings.get(meetingId);
if (m != null) {
m.stopStartAllRecordings();
}
}
}
......@@ -32,7 +32,6 @@ import org.red5.server.api.Red5;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.api.stream.IPlayItem;
import org.red5.server.api.stream.IServerStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.ISubscriberStream;
import org.red5.server.scheduling.QuartzSchedulingService;
......@@ -65,6 +64,8 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
private final Map<String, VideoRotator> videoRotators = new HashMap<String, VideoRotator>();
private MeetingManager meetingManager;
@Override
public boolean appStart(IScope app) {
super.appStart(app);
......@@ -248,7 +249,6 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
String meetingId = conn.getScope().getName();
String streamId = stream.getPublishedName();
Matcher matcher = RECORD_STREAM_ID_PATTERN.matcher(stream.getPublishedName());
addH263PublishedStream(streamId);
if (streamId.contains("/")) {
......@@ -259,18 +259,17 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
}
} else if (matcher.matches()) {
log.info("Start recording of stream=[" + stream.getPublishedName() + "] for meeting=[" + conn.getScope().getName() + "]");
Boolean recordVideoStream = true;
VideoStreamListener listener = new VideoStreamListener(meetingId, streamId,
recordVideoStream, userId, packetTimeout, scheduler);
listener.setEventRecordingService(recordingService);
stream.addStreamListener(listener);
streamListeners.put(conn.getScope().getName() + "-" + stream.getPublishedName(), listener);
recordVideoStream, userId, packetTimeout, scheduler, recordingService);
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
VideoStream vstream = new VideoStream(stream, listener, cstream);
vstream.startRecording();
recordStream(stream);
meetingManager.addStream(meetingId, vstream);
}
}
private Long genTimestamp() {
......@@ -309,11 +308,8 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
}
removeH263PublishedStream(streamId);
if (matcher.matches()) {
IStreamListener listener = streamListeners.remove(scopeName + "-" + stream.getPublishedName());
if (listener != null) {
((VideoStreamListener) listener).streamStopped();
stream.removeStreamListener(listener);
}
meetingManager.streamBroadcastClose(meetingId, streamId);
meetingManager.removeStream(meetingId, streamId);
long publishDuration = (System.currentTimeMillis() - stream.getCreationTime()) / 1000;
log.info("Stop recording event for stream=[{}] meeting=[{}]", stream.getPublishedName(), scopeName);
......@@ -325,31 +321,9 @@ public class VideoApplication extends MultiThreadedApplicationAdapter {
event.put("duration", new Long(publishDuration).toString());
event.put("eventName", "StopWebcamShareEvent");
recordingService.record(scopeName, event);
}
}
/**
* A hook to record a stream. A file is written in webapps/video/streams/
* @param stream
*/
private void recordStream(IBroadcastStream stream) {
IConnection conn = Red5.getConnectionLocal();
long now = System.currentTimeMillis();
String recordingStreamName = stream.getPublishedName(); // + "-" + now; /** Comment out for now...forgot why I added this - ralam */
try {
log.info("Recording stream " + recordingStreamName);
ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName());
cstream.saveAs(recordingStreamName, false);
} catch (Exception e) {
log.error("ERROR while recording stream " + e.getMessage());
e.printStackTrace();
}
}
public void setPacketTimeout(int timeout) {
this.packetTimeout = timeout;
}
......
package org.bigbluebutton.app.video;
import org.red5.logging.Red5LoggerFactory;
import org.red5.server.api.IConnection;
import org.red5.server.api.Red5;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.stream.ClientBroadcastStream;
import org.slf4j.Logger;
public class VideoStream {
private static Logger log = Red5LoggerFactory.getLogger(VideoStream.class, "video");
private VideoStreamListener videoStreamListener;
private IScope scope;
private String streamId;
private IBroadcastStream stream;
private String recordingStreamName;
private ClientBroadcastStream cstream;
public VideoStream(IBroadcastStream stream, VideoStreamListener videoStreamListener, ClientBroadcastStream cstream) {
this.stream = stream;
this.videoStreamListener = videoStreamListener;
stream.addStreamListener(videoStreamListener);
this.cstream = cstream;
}
public String getStreamId() {
return streamId;
}
public synchronized void startRecording() {
long now = System.currentTimeMillis();
recordingStreamName = stream.getPublishedName() + "-" + now;
try {
log.info("Recording stream " + recordingStreamName);
cstream.saveAs(recordingStreamName, false);
} catch (Exception e) {
log.error("ERROR while recording stream " + e.getMessage());
e.printStackTrace();
}
}
public synchronized void stopRecording() {
if (cstream.isRecording()) {
cstream.stopRecording();
videoStreamListener.reset();
}
}
public synchronized void stopStartRecording() {
stopRecording();
startRecording();
}
public synchronized void streamBroadcastClose() {
stopRecording();
videoStreamListener.streamStopped();
stream.removeStreamListener(videoStreamListener);
}
}
......@@ -86,20 +86,25 @@ public class VideoStreamListener implements IStreamListener {
public VideoStreamListener(String meetingId, String streamId, Boolean record,
String userId, int packetTimeout,
QuartzSchedulingService scheduler) {
QuartzSchedulingService scheduler,
EventRecordingService recordingService) {
this.meetingId = meetingId;
this.streamId = streamId;
this.record = record;
this.videoTimeout = packetTimeout;
this.userId = userId;
this.scheduler = scheduler;
this.recordingService = recordingService;
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
public void reset() {
firstPacketReceived = false;
}
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
......@@ -158,11 +163,6 @@ public class VideoStreamListener implements IStreamListener {
}
}
public void setEventRecordingService(EventRecordingService s) {
recordingService = s;
}
public void streamStopped() {
this.publishing = false;
......
package org.bigbluebutton.red5.pubsub;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.bigbluebutton.app.video.MeetingManager;
import org.bigbluebutton.red5.pubsub.message.RecordChapterBreakMessage;
public class MeetingMessageHandler implements MessageHandler {
private final String HEADER = "header";
private final String NAME = "name";
private final String BODY = "body";
private final String MEETING_ID = "meetingId";
private final String TIMESTAMP = "timestamp";
private final String RecordingChapterBreakSysMsg = "RecordingChapterBreakSysMsg";
private MeetingManager meetingManager;
public void handleMessage(String pattern, String channel, String message) {
JsonParser parser = new JsonParser();
JsonObject obj = (JsonObject) parser.parse(message);
if (obj.has(HEADER) && obj.has(BODY)) {
JsonObject header = obj.getAsJsonObject(HEADER);
if (header.has(NAME)) {
String name = header.get(NAME).getAsString();
handle(name, obj.getAsJsonObject(BODY));
}
}
}
private void handle(String name, JsonObject body) {
if (RecordingChapterBreakSysMsg.equals(name)) {
if (body.has(MEETING_ID) && body.has(TIMESTAMP)) {
String meetingId = body.get(MEETING_ID).getAsString();
Long timestamp = body.get(TIMESTAMP).getAsLong();
RecordChapterBreakMessage chBreak = new RecordChapterBreakMessage(meetingId, timestamp);
meetingManager.stopStartAllRecordings(meetingId);
}
}
}
public void setMeetingManager(MeetingManager mgr) {
this.meetingManager = mgr;
}
}
......@@ -35,7 +35,7 @@ public class MessageReceiver {
jedis = new Jedis(host, port);
// Set the name of this client to be able to distinguish when doing
// CLIENT LIST on redis-cli
jedis.clientSetname("BbbWebSub");
jedis.clientSetname("BbbVideoSub");
Runnable messageReceiver = new Runnable() {
public void run() {
......@@ -78,6 +78,13 @@ public class MessageReceiver {
@Override
public void onMessage(String channel, String message) {
// Not used.
Runnable task = new Runnable() {
public void run() {
handler.handleMessage("", channel, message);
}
};
runExec.execute(task);
}
@Override
......
package org.bigbluebutton.red5.pubsub.message;
public class RecordChapterBreakMessage {
public final String meetingId;
public final Long timestamp;
public RecordChapterBreakMessage(String meetingId, Long timestamp) {
this.meetingId = meetingId;
this.timestamp = timestamp;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment