diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java index eacc68d51249a75922427aacf93f8fda7546ba74..55bfa8f19cc764102c49551cd2a45feab1672fe6 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java @@ -265,6 +265,7 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { VideoStreamListener listener = new VideoStreamListener(meetingId, streamId, recordVideoStream, userId, packetTimeout, scheduler, recordingService); ClientBroadcastStream cstream = (ClientBroadcastStream) this.getBroadcastStream(conn.getScope(), stream.getPublishedName()); + stream.addStreamListener(listener); VideoStream vstream = new VideoStream(stream, listener, cstream); vstream.startRecording(); diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStream.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStream.java index fa83c82a2e807e882d9859ff69e09c46c15587f6..9d99f139debe11a3202fec8bcfa0630376136a5b 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStream.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStream.java @@ -50,6 +50,7 @@ public class VideoStream { public synchronized void stopStartRecording() { stopRecording(); + videoStreamListener.reset(); startRecording(); } diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java index 2d20617ed4f2b8ca2eb0eaca7935d2165d3b5798..3d511b0fbd75d919639cdc8acb0f92a9ff6f8b54 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoStreamListener.java @@ -1,26 +1,26 @@ /** -* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ -* -* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). -* -* This program is free software; you can redistribute it and/or modify it under the -* terms of the GNU Lesser General Public License as published by the Free Software -* Foundation; either version 3.0 of the License, or (at your option) any later -* version. -* -* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY -* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A -* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. -* -* You should have received a copy of the GNU Lesser General Public License along -* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. -* -*/ + * BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ + * <p> + * Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). + * <p> + * This program is free software; you can redistribute it and/or modify it under the + * terms of the GNU Lesser General Public License as published by the Free Software + * Foundation; either version 3.0 of the License, or (at your option) any later + * version. + * <p> + * BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY + * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A + * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + * <p> + * You should have received a copy of the GNU Lesser General Public License along + * with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. + */ package org.bigbluebutton.app.video; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.mina.core.buffer.IoBuffer; import org.red5.server.api.scheduling.IScheduledJob; import org.red5.server.api.scheduling.ISchedulingService; @@ -51,157 +51,157 @@ import com.google.gson.Gson; * */ public class VideoStreamListener implements IStreamListener { - private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video"); - - private EventRecordingService recordingService; - private volatile boolean firstPacketReceived = false; - - // Maximum time between video packets - private int videoTimeout = 10000; - private long firstPacketTime = 0L; - private long packetCount = 0L; - - // Last time video was received, not video timestamp - private long lastVideoTime; - - private String userId; - - // Stream being observed - private String streamId; - - // if this stream is recorded or not - private boolean record; - - // Scheduler - private QuartzSchedulingService scheduler; - - // Event queue worker job name - private String timeoutJobName; - - private volatile boolean publishing = false; - - private volatile boolean streamPaused = false; - - private String meetingId; - - public VideoStreamListener(String meetingId, String streamId, Boolean record, - String userId, int packetTimeout, - QuartzSchedulingService scheduler, - EventRecordingService recordingService) { - this.meetingId = meetingId; - this.streamId = streamId; - this.record = record; - this.videoTimeout = packetTimeout; - this.userId = userId; - this.scheduler = scheduler; - this.recordingService = recordingService; - } - - private Long genTimestamp() { - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - } - - public void reset() { - firstPacketReceived = false; - } - - @Override - public void packetReceived(IBroadcastStream stream, IStreamPacket packet) { - IoBuffer buf = packet.getData(); - if (buf != null) - buf.rewind(); - - if (buf == null || buf.remaining() == 0){ - return; - } - - if (packet instanceof VideoData) { - // keep track of last time video was received - lastVideoTime = System.currentTimeMillis(); - packetCount++; - - if (! firstPacketReceived) { - firstPacketReceived = true; - publishing = true; - firstPacketTime = lastVideoTime; - - // start the worker to monitor if we are still receiving video packets - timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob()); - - if (record) { - Map<String, String> event = new HashMap<String, String>(); - event.put("module", "WEBCAM"); - event.put("timestamp", genTimestamp().toString()); - event.put("meetingId", meetingId); - event.put("stream", stream.getPublishedName()); - event.put("eventName", "StartWebcamShareEvent"); - - recordingService.record(meetingId, event); - } - } - - - if (streamPaused) { - streamPaused = false; - long now = System.currentTimeMillis(); - long numSeconds = (now - lastVideoTime)/1000; - - Map<String, Object> logData = new HashMap<String, Object>(); - logData.put("meetingId", meetingId); - logData.put("userId", userId); - logData.put("stream", stream.getPublishedName()); - logData.put("packetCount", packetCount); - logData.put("publishing", publishing); - logData.put("pausedFor (sec)", numSeconds); - - Gson gson = new Gson(); - String logStr = gson.toJson(logData); - - log.warn("Video stream restarted. data={}", logStr ); - } - - } - } - - - public void streamStopped() { - this.publishing = false; - } - - private class TimeoutJob implements IScheduledJob { - private boolean streamStopped = false; - - public void execute(ISchedulingService service) { - Map<String, Object> logData = new HashMap<String, Object>(); - logData.put("meetingId", meetingId); - logData.put("userId", userId); - logData.put("stream", streamId); - logData.put("packetCount", packetCount); - logData.put("publishing", publishing); - - Gson gson = new Gson(); - - long now = System.currentTimeMillis(); - if ((now - lastVideoTime) > videoTimeout && !streamPaused) { - streamPaused = true; - long numSeconds = (now - lastVideoTime)/1000; - - logData.put("lastPacketTime (sec)", numSeconds); - - String logStr = gson.toJson(logData); - - log.warn("Video packet timeout. data={}", logStr ); - - } - - String logStr = gson.toJson(logData); - if (!publishing) { - log.warn("Removing scheduled job. data={}", logStr ); - // remove the scheduled job - scheduler.removeScheduledJob(timeoutJobName); - } + private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video"); + + private EventRecordingService recordingService; + private volatile boolean firstPacketReceived = false; + + // Maximum time between video packets + private int videoTimeout = 10000; + private long firstPacketTime = 0L; + private long packetCount = 0L; + + // Last time video was received, not video timestamp + private long lastVideoTime; + + private String userId; + + // Stream being observed + private String streamId; + + // if this stream is recorded or not + private boolean record; + + // Scheduler + private QuartzSchedulingService scheduler; + + // Event queue worker job name + private String timeoutJobName; + + private volatile boolean publishing = false; + + private volatile boolean streamPaused = false; + + private String meetingId; + + public VideoStreamListener(String meetingId, String streamId, Boolean record, + String userId, int packetTimeout, + QuartzSchedulingService scheduler, + EventRecordingService recordingService) { + this.meetingId = meetingId; + this.streamId = streamId; + this.record = record; + this.videoTimeout = packetTimeout; + this.userId = userId; + this.scheduler = scheduler; + this.recordingService = recordingService; + } + + private Long genTimestamp() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + } + + public void reset() { + firstPacketReceived = false; + } + + @Override + public void packetReceived(IBroadcastStream stream, IStreamPacket packet) { + IoBuffer buf = packet.getData(); + if (buf != null) + buf.rewind(); + + if (buf == null || buf.remaining() == 0) { + return; + } + + if (packet instanceof VideoData) { + // keep track of last time video was received + lastVideoTime = System.currentTimeMillis(); + packetCount++; + + if (!firstPacketReceived) { + firstPacketReceived = true; + publishing = true; + firstPacketTime = lastVideoTime; + + // start the worker to monitor if we are still receiving video packets + timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob()); + + if (record) { + Map<String, String> event = new HashMap<String, String>(); + event.put("module", "WEBCAM"); + event.put("timestamp", genTimestamp().toString()); + event.put("meetingId", meetingId); + event.put("stream", stream.getPublishedName()); + event.put("eventName", "StartWebcamShareEvent"); + + recordingService.record(meetingId, event); } - + } + + + if (streamPaused) { + streamPaused = false; + long now = System.currentTimeMillis(); + long numSeconds = (now - lastVideoTime) / 1000; + + Map<String, Object> logData = new HashMap<String, Object>(); + logData.put("meetingId", meetingId); + logData.put("userId", userId); + logData.put("stream", stream.getPublishedName()); + logData.put("packetCount", packetCount); + logData.put("publishing", publishing); + logData.put("pausedFor (sec)", numSeconds); + + Gson gson = new Gson(); + String logStr = gson.toJson(logData); + + log.warn("Video stream restarted. data={}", logStr); + } + } + } + + + public void streamStopped() { + this.publishing = false; + } + + private class TimeoutJob implements IScheduledJob { + private boolean streamStopped = false; + + public void execute(ISchedulingService service) { + Map<String, Object> logData = new HashMap<String, Object>(); + logData.put("meetingId", meetingId); + logData.put("userId", userId); + logData.put("stream", streamId); + logData.put("packetCount", packetCount); + logData.put("publishing", publishing); + + Gson gson = new Gson(); + + long now = System.currentTimeMillis(); + if ((now - lastVideoTime) > videoTimeout && !streamPaused) { + streamPaused = true; + long numSeconds = (now - lastVideoTime) / 1000; + + logData.put("lastPacketTime (sec)", numSeconds); + + String logStr = gson.toJson(logData); + + log.warn("Video packet timeout. data={}", logStr); + + } + + String logStr = gson.toJson(logData); + if (!publishing) { + log.warn("Removing scheduled job. data={}", logStr); + // remove the scheduled job + scheduler.removeScheduledJob(timeoutJobName); + } + } + + } }