Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions api/main_endpoints/models/ChatMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const mongoose = require('mongoose');

const ChatMessageSchema = new mongoose.Schema({
createdAt: {
type: Date,
default: Date.now,
index: true,
},
expiresAt: {
type: Date,
default: () => new Date(Date.now() + 5 * 1000), // 24 hours from now
index: {expires: 0}
},
chatroomId: {
type: String,
required: true,
index: true,
ref: 'Chatroom',
},
userId: {
type: mongoose.Schema.Types.ObjectId,
required: true,
ref: 'User',
},
text: {
type: String,
required: true,
}
});

// Compound index
ChatMessageSchema.index({ chatroomId: 1, createdAt: -1 });

module.exports = mongoose.model('ChatMessage', ChatMessageSchema);
116 changes: 82 additions & 34 deletions api/main_endpoints/routes/Messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const {
const { MAX_AMOUNT_OF_CONNECTIONS } = require('../../util/constants').MESSAGES_API;
const express = require('express');
const router = express.Router();
const ChatMessage = require('../../main_endpoints/models/ChatMessage');
const bodyParser = require('body-parser');
const User = require('../models/User.js');
const logger = require('../../util/logger');
Expand All @@ -22,9 +23,9 @@ router.use(bodyParser.json());
const clients = {};
const numberOfConnections = {};
const lastMessageSent = {};

const writeMessage = ((roomId, message, username) => {

// function to save and brodcast messages
const writeMessage = async (roomId, message, username, userId) => {
// msg obj to send over sse
const messageObj = {
timestamp: Date.now(),
message,
Expand All @@ -35,26 +36,42 @@ const writeMessage = ((roomId, message, username) => {
clients[roomId].forEach(res => res.write(`data: ${JSON.stringify(messageObj)}\n\n`));
}

try {
const newMessage = new ChatMessage({ // saves msg to mongo
chatroomId: roomId,
userId: userId,
text: message,
});
await newMessage.save(); // keeps msg
} catch (err) {
logger.error('Error saving chat message to MongoDB:', err);
}

lastMessageSent[roomId] = JSON.stringify(messageObj);

// increase the total messages sent counter
MetricsHandler.totalMessagesSent.inc();

// increase the total amount of messages sent per chatroom counter
MetricsHandler.totalChatMessagesPerChatRoom.labels(roomId).inc();
});
};

router.post('/send', async (req, res) => {

const {message, id} = req.body;
const {message, id: roomId} = req.body;
if (!message || !roomId) {
return res.status(BAD_REQUEST).send('Message and Room ID are required');
}

const token = req.headers['authorization'];
const apiKey = req.headers['x-api-key'];


const required = [
{value: token || apiKey, title: 'Token or API Key', },
{value: message, title: 'Message', },
{value: id, title: 'Room ID', },
{value: roomId, title: 'Room ID' },

];

const missingValue = required.find(({value}) => !value);
Expand All @@ -64,7 +81,8 @@ router.post('/send', async (req, res) => {
return;
}

let nameToUse = null;

let nameToUse, userId;

if (apiKey) {
try {
Expand All @@ -73,20 +91,22 @@ router.post('/send', async (req, res) => {
return res.sendStatus(UNAUTHORIZED);
}
nameToUse = result.firstName;
userId = result._id;
} catch (error) {
logger.error('Error in /send User.findOne: ', error);
return res.sendStatus(SERVER_ERROR);
}
}

} else {
// Assume user passed a non null/undefined token
const userObj = decodeToken(req);
if (!userObj) {
return res.sendStatus(UNAUTHORIZED);
const userObj = decodeToken(req);
if (!userObj) {
return res.sendStatus(UNAUTHORIZED);
}
nameToUse = userObj.firstName;
userId = userObj._id;
}
nameToUse = userObj.firstName;
try {
writeMessage(id, `${message}`, `${nameToUse}:`);
await writeMessage(roomId, message, `${nameToUse}:`, userId);
return res.json({ status: 'Message sent' });
} catch (error) {
logger.error('Error in /send writeMessage: ', error);
Expand All @@ -95,11 +115,11 @@ router.post('/send', async (req, res) => {
});

router.get('/getLatestMessage', async (req, res) => {
const {apiKey, id} = req.query;
const {apiKey, id: roomId} = req.query;

const required = [
{value: apiKey, title: 'API Key'},
{value: id, title: 'Room ID'},
{value: roomId, title: 'Room ID'},
];

const missingValue = required.find(({value}) => !value);
Expand All @@ -109,29 +129,37 @@ router.get('/getLatestMessage', async (req, res) => {
return;
}

try {
User.findOne({ apiKey }, (error, result) => {
if (error) {
logger.error('/listen received an invalid API key: ', error);
res.sendStatus(SERVER_ERROR);
return;
}

if (!result) { // return unauthorized if no api key found
return res.sendStatus(UNAUTHORIZED);
}

if (!lastMessageSent[id]) {
User.findOne({ apiKey }, (error, result) => {
if (error) {
logger.error('/listen received an invalid API key: ', error);
res.sendStatus(SERVER_ERROR);
return;
}

if (!result) { // return unauthorized if no api key found
return res.sendStatus(UNAUTHORIZED);
}
/*
if (!lastMessageSent[roomId]) {
return res.status(OK).send('Room closed');
}

return res.status(OK).send(lastMessageSent[id]);

});
} catch (error) {
logger.error('Error in /get: ', error);
res.sendStatus(SERVER_ERROR);
}
return res.status(OK).send(lastMessageSent[roomId]);
*/
ChatMessage.find({chatroomId: roomId})
.sort({ createdAt: -1 })
.limit(20)
.lean()
.exec((err, messages) => {
if(err){
logger.error('Error querying messages', err);
return res.sendStatus(SERVER_ERROR);
}
return res.status(OK).json(messages);
});
});
});

router.get('/listen', async (req, res) => {
Expand Down Expand Up @@ -195,6 +223,26 @@ router.get('/listen', async (req, res) => {
MetricsHandler.currentConnectionsOpen.labels(id).inc();

clients[id].push(res);
// sends chat history
(async () => {
try {
const history = await ChatMessage.findOne({chatroomId: id})
.sort({createdAt:-1}) // loads newest msg first
.lean();
if(!history){
return;
}
const user = await User.findById(history.userId, { firstName: 1 }).lean();
const username = `${(user && user.firstName) || 'Unknown'}:`;
res.write(`data: ${JSON.stringify({
timestamp: new Date(history.createdAt || Date.now()).getTime(),
message: history.text,
username
})}\n\n`);
}catch (e) {
logger.error('history hydration error', e);
}
})();

req.on('close', () => {
if(clients[id]){
Expand Down