Skip to content
Snippets Groups Projects
Commit 71e0ca9d authored by Tainan Felipe's avatar Tainan Felipe
Browse files

Add sync and reconnection handler

parent b298bf43
No related branches found
No related tags found
No related merge requests found
Showing with 196 additions and 18 deletions
......@@ -3,8 +3,12 @@ import sendGroupChatMsg from './methods/sendGroupChatMsg';
import clearPublicChatHistory from './methods/clearPublicChatHistory';
import startUserTyping from './methods/startUserTyping';
import stopUserTyping from './methods/stopUserTyping';
import chatMessageBeforeJoinCounter from './methods/chatMessageBeforeJoinCounter';
import fetchMessagePerPage from './methods/fetchMessagePerPage';
Meteor.methods({
fetchMessagePerPage,
chatMessageBeforeJoinCounter,
sendGroupChatMsg,
clearPublicChatHistory,
startUserTyping,
......
import { Meteor } from 'meteor/meteor';
import { check } from 'meteor/check';
import GroupChat from '/imports/api/group-chat';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import Users from '/imports/api/users';
import { extractCredentials } from '/imports/api/common/server/helpers';
const CHAT_CONFIG = Meteor.settings.public.chat;
const PUBLIC_CHAT_TYPE = CHAT_CONFIG.type_public;
export default function chatMessageBeforeJoinCounter() {
const { meetingId, requesterUserId } = extractCredentials(this.userId);
const groupChats = GroupChat.find({
$or: [
{ meetingId, access: PUBLIC_CHAT_TYPE },
{ meetingId, users: { $all: [requesterUserId] } },
],
}).fetch();
const User = Users.findOne({ userId: requesterUserId, meetingId });
const chatIdWithCounter = groupChats.map((groupChat) => {
const msgCount = GroupChatMsg.find({ chatId: groupChat.chatId, timestamp: { $lt: User.authTokenValidatedTime } }).count();
return {
chatId: groupChat.chatId,
count: msgCount,
};
}).filter(chat => chat.count);
return chatIdWithCounter;
}
import { Meteor } from 'meteor/meteor';
import GroupChat from '/imports/api/group-chat';
import { GroupChatMsg } from '/imports/api/group-chat-msg';
import Users from '/imports/api/users';
import { extractCredentials } from '/imports/api/common/server/helpers';
const CHAT_CONFIG = Meteor.settings.public.chat;
const ITENS_PER_PAGE = CHAT_CONFIG.itemsPerPage;
export default function fetchMessagePerPage(chatId, page) {
const { meetingId, requesterUserId } = extractCredentials(this.userId);
const User = Users.findOne({ userId: requesterUserId, meetingId });
const messages = GroupChatMsg.find({ chatId, meetingId, timestamp: { $lt: User.authTokenValidatedTime } },
{
sort: { timestamp: 1 },
skip: page > 0 ? ((page - 1) * ITENS_PER_PAGE) : 0,
limit: ITENS_PER_PAGE,
})
.fetch();
return messages;
}
......@@ -22,14 +22,12 @@ function groupChatMsg(chatsIds) {
const User = Users.findOne({ userId });
const selector = {
// change loginTime to lasJoinTime when available
timestamp: { $gte: User.loginTime },
timestamp: { $gte: User.authTokenValidatedTime },
$or: [
{ meetingId, chatId: { $eq: PUBLIC_GROUP_CHAT_ID } },
{ chatId: { $in: chatsIds } },
],
};
console.log('Users\n\n', selector, GroupChatMsg.find(selector).fetch());
return GroupChatMsg.find(selector);
}
......
......@@ -23,7 +23,8 @@ const DEBOUNCE_TIME = 1000;
const sysMessagesIds = {
welcomeId: `${SYSTEM_CHAT_TYPE}-welcome-msg`,
moderatorId: `${SYSTEM_CHAT_TYPE}-moderator-msg`
moderatorId: `${SYSTEM_CHAT_TYPE}-moderator-msg`,
syncId: `${SYSTEM_CHAT_TYPE}-sync-msg`
};
const intlMessages = defineMessages({
......@@ -43,6 +44,10 @@ const intlMessages = defineMessages({
id: 'app.chat.partnerDisconnected',
description: 'System chat message when the private chat partnet disconnect from the meeting',
},
loading: {
id: 'app.chat.loading',
description: 'loading message',
},
});
let previousChatId = null;
......@@ -121,7 +126,23 @@ const ChatContainer = (props) => {
applyPropsToState = () => {
if (!_.isEqualWith(lastMsg, stateLastMsg) || previousChatId !== chatID) {
const timeWindowsValues = isPublicChat
? [...Object.values(contextChat?.preJoinMessages || {}), ...systemMessagesIds.map((item) => systemMessages[item]),
? [
...(
!contextChat?.syncing ? Object.values(contextChat?.preJoinMessages || {}) : [
{
id: sysMessagesIds.syncId,
content: [{
id: 'synced',
text: intl.formatMessage(intlMessages.loading, { 0: contextChat?.syncedPercent}),
time: loginTime + 1,
}],
key: sysMessagesIds.syncId,
time: loginTime + 1,
sender: null,
}
]
)
, ...systemMessagesIds.map((item) => systemMessages[item]),
...Object.values(contextChat?.posJoinMessages || {})]
: [...Object.values(contextChat?.messageGroups || {})];
if (previousChatId !== chatID) {
......
import { useContext, useEffect } from 'react';
import { useContext, useEffect, useState } from 'react';
import _ from 'lodash';
import { ChatContext, ACTIONS } from './context';
import { UsersContext } from '../users-context/context';
import { makeCall } from '/imports/ui/services/api';
import ChatLogger from '/imports/ui/components/chat/chat-logger/ChatLogger';
let usersData = {};
let messageQueue = [];
const CHAT_CONFIG = Meteor.settings.public.chat;
const ITENS_PER_PAGE = CHAT_CONFIG.itemsPerPage;
const TIME_BETWEEN_FETCHS = CHAT_CONFIG.timeBetweenFetchs;
const getMessagesBeforeJoinCounter = async () => {
const counter = await makeCall('chatMessageBeforeJoinCounter');
return counter;
};
const startSyncMessagesbeforeJoin = async (dispatch) => {
const chatsMessagesCount = await getMessagesBeforeJoinCounter();
const pagesPerChat = chatsMessagesCount.map(chat => ({ ...chat, pages: Math.ceil(chat.count / ITENS_PER_PAGE), syncedPages: 0 }));
const syncRoutine = async (chatsToSync) => {
if (!chatsToSync.length) return;
const pagesToFetch = [...chatsToSync].sort((a, b) => a.pages - b.pages);
const chatWithLessPages = pagesToFetch[0];
chatWithLessPages.syncedPages += 1;
const messagesFromPage = await makeCall('fetchMessagePerPage', chatWithLessPages.chatId, chatWithLessPages.syncedPages);
if (messagesFromPage.length) {
dispatch({
type: ACTIONS.ADDED,
value: messagesFromPage,
});
dispatch({
type: ACTIONS.SYNC_STATUS,
value: {
chatId: chatWithLessPages.chatId,
percentage: Math.floor((chatWithLessPages.syncedPages / chatWithLessPages.pages) * 100),
},
});
}
await new Promise(r => setTimeout(r, TIME_BETWEEN_FETCHS));
syncRoutine(pagesToFetch.filter(chat => !(chat.syncedPages > chat.pages)));
};
syncRoutine(pagesPerChat);
};
const Adapter = () => {
const usingChatContext = useContext(ChatContext);
const { dispatch } = usingChatContext;
const usingUsersContext = useContext(UsersContext);
const { users } = usingUsersContext;
const [syncStarted, setSync] = useState(true);
ChatLogger.trace('chatAdapter::body::users', users);
useEffect(() => {
const connectionStatus = Meteor.status();
if (connectionStatus.connected && !syncStarted) {
setSync(true);
startSyncMessagesbeforeJoin(dispatch);
}
}, [Meteor.status().connected, syncStarted]);
useEffect(() => {
usersData = users;
}, [usingUsersContext]);
useEffect(() => {
// TODO: listen to websocket message to avoid full list comparsion
if (!Meteor.status().connected) return;
setSync(false);
dispatch({
type: ACTIONS.CLEAR_ALL,
});
const throttledDispatch = _.throttle(() => {
const dispatchedMessageQueue = [...messageQueue];
messageQueue = [];
......@@ -32,9 +91,7 @@ const Adapter = () => {
if (msg.data.indexOf('{"msg":"added","collection":"group-chat-msg"') != -1) {
const parsedMsg = JSON.parse(msg.data);
if (parsedMsg.msg === 'added') {
messageQueue.push({
msg: parsedMsg.fields,
});
messageQueue.push(parsedMsg.fields);
throttledDispatch();
}
}
......@@ -44,7 +101,7 @@ const Adapter = () => {
});
}
});
}, []);
}, [Meteor.status().connected]);
return null;
};
......
......@@ -22,13 +22,16 @@ export const ACTIONS = {
REMOVED: 'removed',
LAST_READ_MESSAGE_TIMESTAMP_CHANGED: 'last_read_message_timestamp_changed',
INIT: 'initial_structure',
SYNC_STATUS: 'sync_status',
HAS_MESSAGE_TO_SYNC: 'has_message_to_sync',
CLEAR_ALL: 'clear_all',
};
const ROLE_MODERATOR = Meteor.settings.public.user.role_moderator;
export const getGroupingTime = () => Meteor.settings.public.chat.grouping_messages_window;
export const getGroupChatId = () => Meteor.settings.public.chat.public_group_id;
export const getLoginTime = () => (Users.findOne({ userId: Auth.userID }) || {}).loginTime || 0;
export const getLoginTime = () => (Users.findOne({ userId: Auth.userID }) || {}).authTokenValidatedTime || 0;
const generateTimeWindow = (timestamp) => {
const groupingTime = getGroupingTime();
......@@ -40,12 +43,12 @@ const generateTimeWindow = (timestamp) => {
export const ChatContext = createContext();
const generateStateWithNewMessage = ({ msg, senderData }, state) => {
const generateStateWithNewMessage = (msg, state) => {
const timeWindow = generateTimeWindow(msg.timestamp);
const userId = msg.sender.id;
const keyName = userId + '-' + timeWindow;
const msgBuilder = ({msg, senderData}, chat) => {
const msgBuilder = (msg, chat) => {
const msgTimewindow = generateTimeWindow(msg.timestamp);
const key = msg.sender.id + '-' + msgTimewindow;
const chatIndex = chat?.chatIndexes[key];
......@@ -80,6 +83,7 @@ const generateStateWithNewMessage = ({ msg, senderData }, state) => {
chatIndexes: {},
preJoinMessages: {},
posJoinMessages: {},
synced:true,
unreadTimeWindows: new Set(),
unreadCount: 0,
};
......@@ -87,6 +91,7 @@ const generateStateWithNewMessage = ({ msg, senderData }, state) => {
state[msg.chatId] = {
count: 0,
lastSender: '',
synced:true,
chatIndexes: {},
messageGroups: {},
unreadTimeWindows: new Set(),
......@@ -106,7 +111,7 @@ const generateStateWithNewMessage = ({ msg, senderData }, state) => {
if (!groupMessage || (groupMessage && groupMessage.sender.id !== stateMessages.lastSender.id)) {
const [tempGroupMessage, sender, newIndex] = msgBuilder({msg, senderData}, stateMessages);
const [tempGroupMessage, sender, newIndex] = msgBuilder(msg, stateMessages);
stateMessages.lastSender = sender;
stateMessages.chatIndexes[keyName] = newIndex;
stateMessages.lastTimewindow = keyName + '-' + newIndex;
......@@ -161,7 +166,7 @@ const reducer = (state, action) => {
const currentClosedChats = Storage.getItem(CLOSED_CHAT_LIST_KEY) || [];
const loginTime = getLoginTime();
const newState = batchMsgs.reduce((acc, i)=> {
const message = i.msg;
const message = i;
const chatId = message.chatId;
if (
chatId !== PUBLIC_GROUP_CHAT_KEY
......@@ -169,8 +174,7 @@ const reducer = (state, action) => {
&& currentClosedChats.includes(chatId) ){
closedChatsToOpen.add(chatId)
}
return generateStateWithNewMessage(i, acc);
return generateStateWithNewMessage(message, acc);
}, state);
if (closedChatsToOpen.size) {
......@@ -260,6 +264,45 @@ const reducer = (state, action) => {
}
return state;
}
case ACTIONS.SYNC_STATUS: {
ChatLogger.debug(ACTIONS.SYNC_STATUS);
const newState = { ...state };
newState[action.value.chatId].syncedPercent = action.value.percentage;
newState[action.value.chatId].syncing = action.value.percentage < 100 ? true : false;
return newState;
}
case ACTIONS.CLEAR_ALL: {
ChatLogger.debug(ACTIONS.CLEAR_ALL);
const newState = { ...state };
const chatIds = Object.keys(newState);
chatIds.forEach((chatId) => {
newState[chatId] = chatId === PUBLIC_GROUP_CHAT_KEY ?
{
count: 0,
lastSender: '',
chatIndexes: {},
preJoinMessages: {},
posJoinMessages: {},
syncing: false,
syncedPercent: 0,
unreadTimeWindows: new Set(),
unreadCount: 0,
}
:
{
count: 0,
lastSender: '',
chatIndexes: {},
messageGroups: {},
syncing: false,
syncedPercent: 0,
unreadTimeWindows: new Set(),
unreadCount: 0,
};
});
return newState;
}
default: {
throw new Error(`Unexpected action: ${JSON.stringify(action)}`);
}
......
......@@ -298,6 +298,8 @@ public:
lines: 2
time: 5000
chat:
itemsPerPage: 100
timeBetweenFetchs: 1000
enabled: true
bufferChatInsertsMs: 0
startClosed: false
......
{
"app.home.greeting": "Your presentation will begin shortly ...",
"app.chat.submitLabel": "Send message",
"app.chat.loading": "loading: {0}%",
"app.chat.errorMaxMessageLength": "The message is {0} characters(s) too long",
"app.chat.disconnected": "You are disconnected, messages can't be sent",
"app.chat.locked": "Chat is locked, messages can't be sent",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment