Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions apps/meteor/server/modules/notifications/notifications.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Rooms, Subscriptions, Users } from '@rocket.chat/models';
import type { ImporterProgress } from '../../../app/importer/server/classes/ImporterProgress';
import { emit, StreamPresence } from '../../../app/notifications/server/lib/Presence';
import { SystemLogger } from '../../lib/logger/system';
import { getCachedUserForPublication } from '../streamer/publication-user-cache';
import type { IStreamer, IStreamerConstructor, IPublication } from '../streamer/types';

export class NotificationsModule {
Expand Down Expand Up @@ -98,13 +99,14 @@ export class NotificationsModule {
return false;
}

const user = this.userId ? { _id: this.userId } : undefined;
return Authorization.canReadRoom(room, user, extraData);
const user = await getCachedUserForPublication(this);
return Authorization.canReadRoom(room, user ?? undefined, extraData);
});

this.streamRoomMessage.allowRead('__my_messages__', 'all');
this.streamRoomMessage.allowEmit('__my_messages__', async function (_eventName, { rid }) {
if (!this.userId) {
const user = await getCachedUserForPublication(this);
if (!user) {
return false;
}

Expand All @@ -114,12 +116,12 @@ export class NotificationsModule {
return false;
}

const canAccess = await Authorization.canAccessRoom(room, { _id: this.userId });
const canAccess = await Authorization.canAccessRoom(room, user);
if (!canAccess) {
return false;
}

const roomParticipant = await Subscriptions.countByRoomIdAndUserId(room._id, this.userId);
const roomParticipant = await Subscriptions.countByRoomIdAndUserId(room._id, user._id);

return {
roomParticipant: roomParticipant > 0,
Expand All @@ -135,10 +137,11 @@ export class NotificationsModule {
this.streamAll.allowWrite('none');
this.streamAll.allowRead('all');
this.streamLogged.allowRead('private-settings-changed', async function () {
if (this.userId == null) {
const user = await getCachedUserForPublication(this);
if (!user) {
return false;
}
return Authorization.hasAtLeastOnePermission(this.userId, [
return Authorization.hasAtLeastOnePermission(user, [
'view-privileged-setting',
'edit-privileged-setting',
'manage-selected-settings',
Expand Down Expand Up @@ -168,10 +171,11 @@ export class NotificationsModule {
return !!room && room.t === 'l' && room.v.token === extraData.token;
}

if (!this.userId) {
const user = await getCachedUserForPublication(this);
if (!user) {
return false;
}
const canAccess = await Authorization.canAccessRoomId(room._id, this.userId);
const canAccess = await Authorization.canAccessRoom(room, user);

return canAccess;
});
Expand Down Expand Up @@ -313,19 +317,17 @@ export class NotificationsModule {

this.streamCannedResponses.allowWrite('none');
this.streamCannedResponses.allowRead(async function () {
return (
!!this.userId &&
!!(await Settings.get('Canned_Responses_Enable')) &&
Authorization.hasPermission(this.userId, 'view-canned-responses')
);
const user = await getCachedUserForPublication(this);
return !!user && !!(await Settings.get('Canned_Responses_Enable')) && Authorization.hasPermission(user, 'view-canned-responses');
});

this.streamIntegrationHistory.allowWrite('none');
this.streamIntegrationHistory.allowRead(async function () {
if (!this.userId) {
const user = await getCachedUserForPublication(this);
if (!user) {
return false;
}
return Authorization.hasAtLeastOnePermission(this.userId, ['manage-outgoing-integrations', 'manage-own-outgoing-integrations']);
return Authorization.hasAtLeastOnePermission(user, ['manage-outgoing-integrations', 'manage-own-outgoing-integrations']);
});

this.streamLivechatRoom.allowRead(async (roomId, extraData) => {
Expand All @@ -346,12 +348,14 @@ export class NotificationsModule {

this.streamLivechatQueueData.allowWrite('none');
this.streamLivechatQueueData.allowRead(async function () {
return this.userId ? Authorization.hasPermission(this.userId, 'view-l-room') : false;
const user = await getCachedUserForPublication(this);
return user ? Authorization.hasPermission(user, 'view-l-room') : false;
});

this.streamRoomData.allowWrite('none');
this.streamRoomData.allowRead(async function (rid) {
if (!this.userId) {
const user = await getCachedUserForPublication(this);
if (!user) {
return false;
}

Expand All @@ -361,7 +365,7 @@ export class NotificationsModule {
return false;
}

const canAccess = await Authorization.canAccessRoom(room, { _id: this.userId });
const canAccess = await Authorization.canAccessRoom(room, user);
if (!canAccess) {
return false;
}
Expand Down
42 changes: 42 additions & 0 deletions apps/meteor/server/modules/streamer/publication-user-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import type { IUser } from '@rocket.chat/core-typings';
import { Users } from '@rocket.chat/models';

import type { IPublication } from './types';

type CacheEntry = {
user: IUser; // Pick<IUser, '_id' | 'roles'>
timeout: NodeJS.Timeout;
};

const CACHE_PROJECTION = { _id: 1, roles: 1 } as const;
const CACHE_TIMEOUT = 1000 * 60;
const cacheByPublication = new Map<string, CacheEntry>();

export async function getCachedUserForPublication(publication: IPublication): Promise<CacheEntry['user'] | null> {
const userId = publication.userId ?? publication._session?.userId ?? undefined;
if (userId == null || userId === '') {
return null;
}

const value = invalidate(userId);

const user = value ?? (await Users.findOneById<CacheEntry['user']>(userId, { projection: CACHE_PROJECTION }));

if (user) {
const timeout = setTimeout(() => {
invalidate(userId);
}, CACHE_TIMEOUT);

cacheByPublication.set(userId, { user, timeout });
}
return user;
}

export function invalidate(userId: string): CacheEntry['user'] | null {
const entry = cacheByPublication.get(userId);
if (entry) {
clearTimeout(entry.timeout);
cacheByPublication.delete(userId);
}
return entry?.user ?? null;
}
3 changes: 3 additions & 0 deletions apps/meteor/server/services/meteor/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { getURL } from '../../../app/utils/server/getURL';
import { configureEmailInboxes } from '../../features/EmailInbox/EmailInbox';
import { roomCoordinator } from '../../lib/rooms/roomCoordinator';
import { ListenersModule } from '../../modules/listeners/listeners.module';
import { invalidate as invalidatePublicationUserCache } from '../../modules/streamer/publication-user-cache';

const disableMsgRoundtripTracking = ['yes', 'true'].includes(String(process.env.DISABLE_MESSAGE_ROUNDTRIP_TRACKING).toLowerCase());

Expand Down Expand Up @@ -78,6 +79,8 @@ export class MeteorService extends ServiceClassInternal implements IMeteor {
});

this.onEvent('watch.users', async (data) => {
invalidatePublicationUserCache(data.id);
Comment thread
ggazzo marked this conversation as resolved.

if (data.clientAction === 'updated' && data.diff) {
processOnChange(data.diff, data.id);
}
Expand Down
Loading