diff --git a/bbb-video/build.gradle b/bbb-video/build.gradle index c883cbbac2f7ec5c357823736c57d4393d3ad5b1..c17b252485afaada836df2514e1a2a699bc0a37e 100755 --- a/bbb-video/build.gradle +++ b/bbb-video/build.gradle @@ -14,6 +14,7 @@ task resolveDeps(type: Copy) { repositories { mavenCentral() + mavenLocal() add(new org.apache.ivy.plugins.resolver.ChainResolver()) { name = 'remote' returnFirst = true @@ -90,6 +91,8 @@ dependencies { compile 'redis.clients:jedis:2.0.0' providedCompile 'commons-pool:commons-pool:1.5.6' compile 'com.google.code.gson:gson:1.7.1' + + compile 'org.bigbluebutton:bbb-common-message:0.0.5' } test { diff --git a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java index 43d81edb2a1bd94f94a5ff09340ad455aa826063..7f1a650418e1c06df338159a50df380838337096 100755 --- a/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java +++ b/bbb-video/src/main/java/org/bigbluebutton/app/video/VideoApplication.java @@ -21,6 +21,8 @@ package org.bigbluebutton.app.video; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; + +import org.bigbluebutton.red5.pubsub.MessagePublisher; import org.red5.logging.Red5LoggerFactory; import org.red5.server.adapter.MultiThreadedApplicationAdapter; import org.red5.server.api.IConnection; @@ -31,6 +33,7 @@ import org.red5.server.api.stream.IServerStream; import org.red5.server.api.stream.IStreamListener; import org.red5.server.stream.ClientBroadcastStream; import org.slf4j.Logger; + import com.google.gson.Gson; public class VideoApplication extends MultiThreadedApplicationAdapter { @@ -38,7 +41,7 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { private IScope appScope; private IServerStream serverStream; - + private MessagePublisher publisher; private boolean recordVideoStream = false; private EventRecordingService recordingService; private final Map<String, IStreamListener> streamListeners = new HashMap<String, IStreamListener>(); @@ -160,6 +163,8 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { @Override public void streamPublishStart(IBroadcastStream stream) { super.streamPublishStart(stream); + IConnection conn = Red5.getConnectionLocal(); + log.info("streamPublishStart " + stream.getPublishedName() + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); } @Override @@ -168,6 +173,12 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { super.streamBroadcastStart(stream); log.info("streamBroadcastStart " + stream.getPublishedName() + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); + String userId = getUserId(); + String meetingId = conn.getScope().getName(); + String streamId = stream.getPublishedName(); + + publisher.userSharedWebcamMessage(meetingId, userId, streamId); + if (recordVideoStream) { recordStream(stream); VideoStreamListener listener = new VideoStreamListener(); @@ -184,15 +195,25 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { @Override public void streamBroadcastClose(IBroadcastStream stream) { super.streamBroadcastClose(stream); + IConnection conn = Red5.getConnectionLocal(); + String scopeName; + if (conn != null) { + scopeName = conn.getScope().getName(); + } else { + log.info("Connection local was null, using scope name from the stream: {}", stream); + scopeName = stream.getScope().getName(); + } + + log.info("streamBroadcastClose " + stream.getPublishedName() + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); + + String userId = getUserId(); + String meetingId = conn.getScope().getName(); + String streamId = stream.getPublishedName(); + + publisher.userUnshareWebcamRequestMessage(meetingId, userId, streamId); + if (recordVideoStream) { - IConnection conn = Red5.getConnectionLocal(); - String scopeName; - if (conn != null) { - scopeName = conn.getScope().getName(); - } else { - log.info("Connection local was null, using scope name from the stream: {}", stream); - scopeName = stream.getScope().getName(); - } + IStreamListener listener = streamListeners.remove(scopeName + "-" + stream.getPublishedName()); if (listener != null) { stream.removeStreamListener(listener); @@ -238,4 +259,76 @@ public class VideoApplication extends MultiThreadedApplicationAdapter { recordingService = s; } + public void setMessagePublisher(MessagePublisher publisher) { + this.publisher = publisher; + } + + /** + * Start transmission notification from Flash Player 11.1+. This command asks the server to transmit more data because the buffer is running low. + * + * http://help.adobe.com/en_US/flashmediaserver/devguide/WSd391de4d9c7bd609-569139412a3743e78e-8000.html + * + * @param bool boolean + * @param num number + */ + public void startTransmit(Boolean bool, int num) { + + } + + /** + * Stop transmission notification from Flash Player 11.1+. This command asks the server to suspend transmission until the client sends a + * startTransmit event because there is enough data in the buffer. + */ + public void stopTransmit() { + } + + /** + * Stop transmission notification from Flash Player 11.1+. This command asks the server to suspend transmission until the client sends a + * startTransmit event because there is enough data in the buffer. + * + * @param bool boolean + * @param num number + */ + public void stopTransmit(Boolean bool, int num) { + } + + /** + * Notification method that is sent by FME just before publishing starts. + * + * @param streamName Name of stream that is about to be published. + */ + @Override + public void FCPublish(String streamName) { + IConnection conn = Red5.getConnectionLocal(); + log.info("FCPublish " + streamName + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); + } + + /** + * Notification method that is sent by FME when publishing of a stream ends. + */ + @Override + public void FCUnpublish() { + } + + /** + * Notification method that is sent by FME when publishing of a stream ends. + * + * @param streamName Name of stream that is about to be un-published. + */ + @Override + public void FCUnpublish(String streamName) { + IConnection conn = Red5.getConnectionLocal(); + log.info("FCUnpublish " + streamName + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); + } + + /** + * Notification method that is sent by some clients just before playback starts. + * + * @param streamName Name of stream that is about to be played. + */ + @Override + public void FCSubscribe(String streamName) { + IConnection conn = Red5.getConnectionLocal(); + log.info("FCSubscribe " + streamName + " " + System.currentTimeMillis() + " " + conn.getScope().getName()); + } } diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/GenericObjectPoolConfigWrapper.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/GenericObjectPoolConfigWrapper.java new file mode 100755 index 0000000000000000000000000000000000000000..0ed93505382b95ba3f10076f2ab28fd4f9441579 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/GenericObjectPoolConfigWrapper.java @@ -0,0 +1,142 @@ +/** +* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ +* +* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). +* +* This program is free software; you can redistribute it and/or modify it under the +* terms of the GNU Lesser General Public License as published by the Free Software +* Foundation; either version 3.0 of the License, or (at your option) any later +* version. +* +* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY +* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. +* +* You should have received a copy of the GNU Lesser General Public License along +* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. +* +*/ +package org.bigbluebutton.red5.pubsub; + +import org.apache.commons.pool.impl.GenericObjectPool; + +public class GenericObjectPoolConfigWrapper { + + private final GenericObjectPool.Config config; + + public GenericObjectPoolConfigWrapper() { + this.config = new GenericObjectPool.Config(); + } + + public GenericObjectPool.Config getConfig() { + return config; + } + + public int getMaxIdle() { + return this.config.maxIdle; + } + + public void setMaxIdle(int maxIdle) { + this.config.maxIdle = maxIdle; + } + + public int getMinIdle() { + return this.config.minIdle; + } + + public void setMinIdle(int minIdle) { + this.config.minIdle = minIdle; + } + + public int getMaxActive() { + return this.config.maxActive; + } + + public void setMaxActive(int maxActive) { + this.config.maxActive = maxActive; + } + + public long getMaxWait() { + return this.config.maxWait; + } + + public void setMaxWait(long maxWait) { + this.config.maxWait = maxWait; + } + + public byte getWhenExhaustedAction() { + return this.config.whenExhaustedAction; + } + + public void setWhenExhaustedAction(byte whenExhaustedAction) { + this.config.whenExhaustedAction = whenExhaustedAction; + } + + public boolean isTestOnBorrow() { + return this.config.testOnBorrow; + } + + public void setTestOnBorrow(boolean testOnBorrow) { + this.config.testOnBorrow = testOnBorrow; + } + + public boolean isTestOnReturn() { + return this.config.testOnReturn; + } + + public void setTestOnReturn(boolean testOnReturn) { + this.config.testOnReturn = testOnReturn; + } + + public boolean isTestWhileIdle() { + return this.config.testWhileIdle; + } + + public void setTestWhileIdle(boolean testWhileIdle) { + this.config.testWhileIdle = testWhileIdle; + } + + public long getTimeBetweenEvictionRunsMillis() { + return this.config.timeBetweenEvictionRunsMillis; + } + + public void setTimeBetweenEvictionRunsMillis( + long timeBetweenEvictionRunsMillis) { + this.config.timeBetweenEvictionRunsMillis = + timeBetweenEvictionRunsMillis; + } + + public int getNumTestsPerEvictionRun() { + return this.config.numTestsPerEvictionRun; + } + + public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { + this.config.numTestsPerEvictionRun = numTestsPerEvictionRun; + } + + public long getMinEvictableIdleTimeMillis() { + return this.config.minEvictableIdleTimeMillis; + } + + public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) { + this.config.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis; + } + + public long getSoftMinEvictableIdleTimeMillis() { + return this.config.softMinEvictableIdleTimeMillis; + } + + public void setSoftMinEvictableIdleTimeMillis( + long softMinEvictableIdleTimeMillis) { + this.config.softMinEvictableIdleTimeMillis = + softMinEvictableIdleTimeMillis; + } + + public boolean isLifo() { + return this.config.lifo; + } + + public void setLifo(boolean lifo) { + this.config.lifo = lifo; + } +} diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java new file mode 100755 index 0000000000000000000000000000000000000000..db0c5b0584616338394c6f264ec382c35acb1bcd --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessagePublisher.java @@ -0,0 +1,24 @@ +package org.bigbluebutton.red5.pubsub; + +import org.bigbluebutton.common.messages.*; + +public class MessagePublisher { + + private MessageSender sender; + + public void setMessageSender(MessageSender sender) { + this.sender = sender; + } + + // Polling + public void userSharedWebcamMessage(String meetingId, String userId, String streamId) { + UserSharedWebcamMessage msg = new UserSharedWebcamMessage(meetingId, userId, streamId); + sender.send(MessagingConstants.TO_USERS_CHANNEL, msg.toJson()); + } + + public void userUnshareWebcamRequestMessage(String meetingId, String userId, String streamId) { + UserUnshareWebcamRequestMessage msg = new UserUnshareWebcamRequestMessage(meetingId, userId, streamId); + sender.send(MessagingConstants.TO_USERS_CHANNEL, msg.toJson()); + } + +} diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java new file mode 100755 index 0000000000000000000000000000000000000000..74003393f434496d7aa7f8c8d823f68f56253b04 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageSender.java @@ -0,0 +1,74 @@ +package org.bigbluebutton.red5.pubsub; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.red5.logging.Red5LoggerFactory; +import org.slf4j.Logger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; + +public class MessageSender { + private static Logger log = Red5LoggerFactory.getLogger(MessageSender.class, "bigbluebutton"); + + private JedisPool redisPool; + private volatile boolean sendMessage = false; + + private final Executor msgSenderExec = Executors.newSingleThreadExecutor(); + private final Executor runExec = Executors.newSingleThreadExecutor(); + private BlockingQueue<MessageToSend> messages = new LinkedBlockingQueue<MessageToSend>(); + + public void stop() { + sendMessage = false; + } + + public void start() { + log.info("Redis message publisher starting!"); + try { + sendMessage = true; + + Runnable messageSender = new Runnable() { + public void run() { + while (sendMessage) { + try { + MessageToSend msg = messages.take(); + publish(msg.getChannel(), msg.getMessage()); + } catch (InterruptedException e) { + log.warn("Failed to get message from queue."); + } + } + } + }; + msgSenderExec.execute(messageSender); + } catch (Exception e) { + log.error("Error subscribing to channels: " + e.getMessage()); + } + } + + public void send(String channel, String message) { + MessageToSend msg = new MessageToSend(channel, message); + messages.add(msg); + } + + private void publish(final String channel, final String message) { + Runnable task = new Runnable() { + public void run() { + Jedis jedis = redisPool.getResource(); + try { + jedis.publish(channel, message); + } catch(Exception e){ + log.warn("Cannot publish the message to redis", e); + } finally { + redisPool.returnResource(jedis); + } + } + }; + + runExec.execute(task); + } + + public void setRedisPool(JedisPool redisPool){ + this.redisPool = redisPool; + } +} diff --git a/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageToSend.java b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageToSend.java new file mode 100755 index 0000000000000000000000000000000000000000..3ae3874039b4637af954bf7d5d176bc80d268405 --- /dev/null +++ b/bbb-video/src/main/java/org/bigbluebutton/red5/pubsub/MessageToSend.java @@ -0,0 +1,19 @@ +package org.bigbluebutton.red5.pubsub; + +public class MessageToSend { + private final String channel; + private final String message; + + public MessageToSend(String channel, String message) { + this.channel = channel; + this.message = message; + } + + public String getChannel() { + return channel; + } + + public String getMessage() { + return message; + } +} diff --git a/bbb-video/src/main/webapp/WEB-INF/bbb-redis-pool.xml b/bbb-video/src/main/webapp/WEB-INF/bbb-redis-pool.xml new file mode 100755 index 0000000000000000000000000000000000000000..6144f6571fd02d0b9bf8ce98a8d22b3ce8e9ab65 --- /dev/null +++ b/bbb-video/src/main/webapp/WEB-INF/bbb-redis-pool.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + +BigBlueButton open source conferencing system - http://www.bigbluebutton.org/ + +Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below). + +This program is free software; you can redistribute it and/or modify it under the +terms of the GNU Lesser General Public License as published by the Free Software +Foundation; either version 3.0 of the License, or (at your option) any later +version. + +BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY +WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A +PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans-2.5.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util-2.0.xsd + "> + + <bean id="redisPool" class="redis.clients.jedis.JedisPool"> + <constructor-arg index="0"> + <bean factory-bean="config" factory-method="getConfig" /> + </constructor-arg> + <constructor-arg index="1" value="${redis.host}"/> + <constructor-arg index="2" value="${redis.port}"/> + </bean> + + <bean id="config" class="org.bigbluebutton.red5.pubsub.GenericObjectPoolConfigWrapper"> + <!-- Action to take when trying to acquire a connection and all connections are taken --> + <property name="whenExhaustedAction"> + <!-- Fail-fast behaviour, we don't like to keep the kids waiting --> + <util:constant static-field="org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_FAIL" /> + <!-- Default behaviour, block the caller until a resource becomes available --> + <!--<util:constant static-field="org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_BLOCK" />--> + </property> + <!-- Maximum active connections to Redis instance --> + <property name="maxActive" value="12" /> + <!-- Number of connections to Redis that just sit there and do nothing --> + <property name="maxIdle" value="6" /> + <!-- Minimum number of idle connections to Redis - these can be seen as always open and ready to serve --> + <property name="minIdle" value="1" /> + <!-- Tests whether connection is dead when connection retrieval method is called --> + <property name="testOnBorrow" value="true" /> + <!-- Tests whether connection is dead when returning a connection to the pool --> + <property name="testOnReturn" value="true" /> + <!-- Tests whether connections are dead during idle periods --> + <property name="testWhileIdle" value="true" /> + <!-- Maximum number of connections to test in each idle check --> + <property name="numTestsPerEvictionRun" value="12" /> + <!-- Idle connection checking period --> + <property name="timeBetweenEvictionRunsMillis" value="60000" /> + <!-- Maximum time, in milliseconds, to wait for a resource when exausted action is set to WHEN_EXAUSTED_BLOCK --> + <property name="maxWait" value="5000" /> + </bean> + +</beans> diff --git a/bbb-video/src/main/webapp/WEB-INF/red5-web.xml b/bbb-video/src/main/webapp/WEB-INF/red5-web.xml index 569c06d0f7bde28c4130de9c2afbcc0872e5fc47..899c929cfd761a9b01e05fc062b9cd5d144f1127 100755 --- a/bbb-video/src/main/webapp/WEB-INF/red5-web.xml +++ b/bbb-video/src/main/webapp/WEB-INF/red5-web.xml @@ -48,10 +48,22 @@ with BigBlueButton; if not, see <http://www.gnu.org/licenses/>. <bean id="web.handler" class="org.bigbluebutton.app.video.VideoApplication"> <property name="recordVideoStream" value="true"/> <property name="eventRecordingService" ref="redisRecorder"/> + <property name="messagePublisher" ref="redisPublisher"/> </bean> + <bean id="redisPublisher" class="org.bigbluebutton.red5.pubsub.MessagePublisher"> + <property name="messageSender" ref="redisSender"/> + </bean> + <bean id="redisRecorder" class="org.bigbluebutton.app.video.EventRecordingService"> <constructor-arg index="0" value="${redis.host}"/> <constructor-arg index="1" value="${redis.port}"/> </bean> + + <bean id="redisSender" class="org.bigbluebutton.red5.pubsub.MessageSender" + init-method="start" destroy-method="stop"> + <property name="redisPool"> <ref bean="redisPool"/></property> + </bean> + + <import resource="bbb-redis-pool.xml"/> </beans>