Skip to content

Commit

Permalink
Di imp Contact/File/OpLogs/PushNotif (#152)
Browse files Browse the repository at this point in the history
* update DI

* fix contact specs

* add Push Event/Subscription DI

* refactor

* File to DI

* Op Logs DI
  • Loading branch information
banshiAnton authored Jan 31, 2025
1 parent 8c0aaa8 commit 795b4af
Show file tree
Hide file tree
Showing 85 changed files with 1,098 additions and 734 deletions.
65 changes: 12 additions & 53 deletions APIs/JSON/controllers/contacts.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
import BaseJSONController from "./base.js"

import { ERROR_STATUES } from "@sama/constants/errors.js"

import ServiceLocatorContainer from "@sama/common/ServiceLocatorContainer.js"

import Contact from "@sama/models/contact.js"

import contactsMatchRepository from "@sama/repositories/contact_match_repository.js"

import { ObjectId } from "@sama/lib/db.js"

import Response from "@sama/networking/models/Response.js"

class ContactsController extends BaseJSONController {
async contact_add(ws, data) {
const { id: requestId, contact_add: contactData } = data

const sessionService = ServiceLocatorContainer.use("SessionService")

const currentUser = sessionService.getSessionUserId(ws)

await contactsMatchRepository.matchContactWithUser(contactData)
contactData.user_id = ObjectId(currentUser)
const contactAddOperation = ServiceLocatorContainer.use("ContactAddOperation")

const contact = new Contact(contactData)
await contact.save()
const contact = await contactAddOperation.perform(ws, contactData, false)

return new Response().addBackMessage({ response: { id: requestId, contact: contact.visibleParams() } })
}
Expand All @@ -35,66 +21,39 @@ class ContactsController extends BaseJSONController {
contact_batch_add: { contacts },
} = data

const contactsList = []
for (let u of contacts) {
if (!u.email || !u.phone) {
continue
}
const contactAddOperation = ServiceLocatorContainer.use("ContactAddOperation")

const contact = (await this.contact_add(ws, { contact_add: u, id: "contact_batch_add" })).backMessages.at(0)
.response.contact
contactsList.push(contact)
}
const contactsList = await contactAddOperation.perform(ws, contacts, true)

return new Response().addBackMessage({ response: { id: requestId, contacts: contactsList } })
}

async contact_update(ws, data) {
const { id: requestId, contact_update: updatedData } = data
const recordId = updatedData.id
delete updatedData["id"]

await contactsMatchRepository.matchContactWithUser(updatedData)
const contactEditOperation = ServiceLocatorContainer.use("ContactEditOperation")

const updatedResult = await Contact.findOneAndUpdate({ _id: recordId }, { $set: updatedData })
const updatedContact = await contactEditOperation.perform(ws, updatedData)

if (!updatedResult.ok) {
throw new Error(ERROR_STATUES.CONTACT_NOT_FOUND.message, {
cause: ERROR_STATUES.CONTACT_NOT_FOUND,
})
}

return new Response().addBackMessage({ response: { id: requestId, contact: updatedResult.value } })
return new Response().addBackMessage({ response: { id: requestId, contact: updatedContact } })
}

async contact_list(ws, data) {
const { id: requestId, contact_list: query } = data

const sessionService = ServiceLocatorContainer.use("SessionService")

const currentUser = sessionService.getSessionUserId(ws).toString()

const queryParams = { user_id: currentUser.toString() }
if (query.updated_at) {
queryParams.updated_at = { $gt: new Date(query.updated_at) }
}
const contactListOperation = ServiceLocatorContainer.use("ContactListOperation")

const contacts = await Contact.findAll(queryParams, null, query.limit)
const contacts = await contactListOperation.perform(ws, query)

return new Response().addBackMessage({ response: { id: requestId, contacts } })
}

async contact_delete(ws, data) {
const {
id: requestId,
contact_delete: { id },
} = data
const { id: requestId, contact_delete } = data

const sessionService = ServiceLocatorContainer.use("SessionService")
const contactDeleteOperation = ServiceLocatorContainer.use("ContactDeleteOperation")

const userId = sessionService.getSessionUserId(ws)
const contact = await Contact.findOne({ _id: id, user_id: userId })
contact && (await contact.delete())
await contactDeleteOperation.perform(ws, contact_delete)

return new Response().addBackMessage({ response: { id: requestId, success: true } })
}
Expand Down
37 changes: 8 additions & 29 deletions APIs/JSON/controllers/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,23 @@ import Response from "@sama/networking/models/Response.js"

class FilesController extends BaseJSONController {
async create_url(ws, data) {
const { id: requestId, create_files: reqFiles } = data
const { id: requestId, create_files } = data

const sessionService = ServiceLocatorContainer.use("SessionService")
const storageService = ServiceLocatorContainer.use("StorageService")
const fileCreateOperation = ServiceLocatorContainer.use("FileCreateOperation")

const currentUserId = sessionService.getSessionUserId(ws)
const createdFiles = await fileCreateOperation.perform(ws, create_files)

const resFiles = []

for (const reqFile of reqFiles) {
const { file, upload_url } = await storageService.createFile(currentUserId, reqFile)

resFiles.push({ ...file, upload_url })
}

return new Response().addBackMessage({ response: { id: requestId, files: resFiles } })
return new Response().addBackMessage({ response: { id: requestId, files: createdFiles } })
}

async get_download_url(ws, data) {
const {
id: requestId,
get_file_urls: { file_ids: objectIds },
} = data

const sessionService = ServiceLocatorContainer.use("SessionService")
const storageService = ServiceLocatorContainer.use("StorageService")

const currentUserId = sessionService.getSessionUserId(ws)

const urls = {}
const { id: requestId, get_file_urls } = data

for (const fileObjectId of objectIds) {
const downloadUrl = await storageService.getFileDownloadUrl(currentUserId, fileObjectId)
const fileDownloadOperation = ServiceLocatorContainer.use("FileDownloadOperation")

urls[fileObjectId] = downloadUrl
}
const downloadUrls = await fileDownloadOperation.perform(ws, get_file_urls)

return new Response().addBackMessage({ response: { id: requestId, file_urls: urls } })
return new Response().addBackMessage({ response: { id: requestId, file_urls: downloadUrls } })
}
}

Expand Down
17 changes: 3 additions & 14 deletions APIs/JSON/controllers/operations_log.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,12 @@ const mapOpLogsMessage = async function (mapper) {

class OperationsLogController extends BaseJSONController {
async logs(ws, data) {
const {
id: requestId,
op_log_list: {
created_at: { gt, lt },
},
} = data
const { id: requestId, op_log_list } = data

const sessionService = ServiceLocatorContainer.use("SessionService")
const opLogsListOperation = ServiceLocatorContainer.use("OpLogsListOperation")

const currentUserId = sessionService.getSessionUserId(ws)
const opLogs = await opLogsListOperation.perform(ws, op_log_list)

const query = {
user_id: currentUserId,
created_at: gt ? { $gt: new Date(gt) } : { $lt: new Date(lt) },
}

const opLogs = await OpLog.findAll(query, ["user_id", "packet"])
const packet = { response: { id: requestId, logs: opLogs } }

return new Response().addBackMessage(new MappableMessage(packet, mapOpLogsMessage))
Expand Down
89 changes: 15 additions & 74 deletions APIs/JSON/controllers/push_notifications.js
Original file line number Diff line number Diff line change
@@ -1,41 +1,16 @@
import BaseJSONController from "./base.js"

import { ERROR_STATUES } from "@sama/constants/errors.js"

import RuntimeDefinedContext from "@sama/store/RuntimeDefinedContext.js"
import ServiceLocatorContainer from "@sama/common/ServiceLocatorContainer.js"

import PushSubscription from "@sama/models/push_subscription.js"

import { ObjectId } from "@sama/lib/db.js"

import Response from "@sama/networking/models/Response.js"
import CreatePushEventOptions from "@sama/lib/push_queue/models/CreatePushEventOptions.js"

class PushNotificationsController extends BaseJSONController {
async push_subscription_create(ws, data) {
const {
id: requestId,
push_subscription_create: { web_endpoint, web_key_auth, web_key_p256dh, device_token, device_udid },
} = data

const sessionService = ServiceLocatorContainer.use("SessionService")

const userId = sessionService.getSessionUserId(ws)
let pushSubscription = new PushSubscription(
(
await PushSubscription.findOneAndUpdate(
{ user_id: userId, device_udid },
{ $set: { web_endpoint, web_key_auth, web_key_p256dh, device_token } }
)
)?.value
)

if (!pushSubscription.params) {
data.push_subscription_create["user_id"] = new ObjectId(userId)
pushSubscription = new PushSubscription(data.push_subscription_create)
await pushSubscription.save()
}
const { id: requestId, push_subscription_create } = data

const pushSubscriptionCreateOperation = ServiceLocatorContainer.use("PushSubscriptionCreateOperation")

const pushSubscription = await pushSubscriptionCreateOperation.perform(ws, push_subscription_create)

return new Response().addBackMessage({
response: {
Expand All @@ -46,65 +21,31 @@ class PushNotificationsController extends BaseJSONController {
}

async push_subscription_list(ws, data) {
const {
id: requestId,
push_subscription_list: { user_id },
} = data
const { id: requestId } = data

const pushSubscriptionListOperation = ServiceLocatorContainer.use("PushSubscriptionListOperation")

const subscriptions = await PushSubscription.findAll({ user_id })
const subscriptions = await pushSubscriptionListOperation.perform(ws, {})

return new Response().addBackMessage({ response: { id: requestId, subscriptions } })
}

async push_subscription_delete(ws, data) {
const {
id: requestId,
push_subscription_delete: { device_udid },
} = data

const sessionService = ServiceLocatorContainer.use("SessionService")
const { id: requestId, push_subscription_delete } = data

const userId = sessionService.getSessionUserId(ws)
const pushSubscriptionRecord = await PushSubscription.findOne({
device_udid,
user_id: userId,
})
if (!pushSubscriptionRecord) {
throw new Error(ERROR_STATUES.NOTIFICATION_NOT_FOUND.message, {
cause: ERROR_STATUES.NOTIFICATION_NOT_FOUND,
})
}
const pushSubscriptionDeleteOperation = ServiceLocatorContainer.use("PushSubscriptionDeleteOperation")

await pushSubscriptionRecord.delete()
await pushSubscriptionDeleteOperation.perform(ws, push_subscription_delete)

return new Response().addBackMessage({ response: { id: requestId, success: true } })
}

async push_event_create(ws, data) {
const {
id: requestId,
push_event_create: { recipients_ids, message },
} = data

const userService = ServiceLocatorContainer.use("UserService")

const recipients = await userService.userRepo.retrieveExistedIds(recipients_ids)

if (!recipients.length) {
throw new Error(ERROR_STATUES.RECIPIENTS_NOT_FOUND.message, {
cause: ERROR_STATUES.RECIPIENTS_NOT_FOUND,
})
}
const { id: requestId, push_event_create } = data

const sessionService = ServiceLocatorContainer.use("SessionService")

const userId = sessionService.getSessionUserId(ws)

const createPushEventOptions = new CreatePushEventOptions(userId, message, {
user_ids: recipients_ids,
})
const pushEventCreateOperation = ServiceLocatorContainer.use("PushEventCreateOperation")

const pushEvent = await RuntimeDefinedContext.PUSH_QUEUE_DRIVER.createPushEvent(createPushEventOptions)
const pushEvent = await pushEventCreateOperation.perform(ws, push_event_create)

return new Response().addBackMessage({ response: { id: requestId, event: pushEvent.visibleParams() } })
}
Expand Down
2 changes: 1 addition & 1 deletion APIs/XMPP
Submodule XMPP updated from c30068 to aa7ac7
43 changes: 16 additions & 27 deletions app/cluster/cluster_syncer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import RuntimeDefinedContext from "../store/RuntimeDefinedContext.js"

import clusterManager from "./cluster_manager.js"

import ClusterNode from "../models/cluster_node.js"

import ServiceLocatorContainer from "@sama/common/ServiceLocatorContainer.js"

class ClusterSyncer {
Expand Down Expand Up @@ -31,7 +29,8 @@ class ClusterSyncer {
}

async #retrieveExistingClusterNodes() {
const nodeList = await ClusterNode.findAll({}, ["ip_address", "hostname", "port"])
const clusterNodeService = ServiceLocatorContainer.use("ClusterNodeService")
const nodeList = await clusterNodeService.retrieveAll()

// initiate connect to other node
nodeList.forEach(async (n) => {
Expand All @@ -52,36 +51,26 @@ class ClusterSyncer {
// if some node is gone, we may need to do some cleaning ?
}

async #storeCurrentNode({ ip_address, hostname, port, users_count }) {
if (await ClusterNode.findOne({ ip_address, hostname, port })) {
await ClusterNode.updateOne(
{ ip_address, hostname, port },
{
$set: { updated_at: new Date(), users_count },
}
)
} else {
const newNode = new ClusterNode({
ip_address,
hostname,
port,
users_count,
})
await newNode.save()
}
}

async #syncCluster() {
const sessionService = ServiceLocatorContainer.use("SessionService")
async #storeCurrentNode(usersCount) {
const clusterNodeService = ServiceLocatorContainer.use("ClusterNodeService")

const clusterNodeParams = {
const addressParams = {
ip_address: RuntimeDefinedContext.APP_IP,
hostname: RuntimeDefinedContext.APP_HOSTNAME,
port: RuntimeDefinedContext.CLUSTER_PORT,
users_count: sessionService.sessionsTotal,
}

await this.#storeCurrentNode(clusterNodeParams)
const optionalParams = {
users_count: usersCount,
}

await clusterNodeService.create(addressParams, optionalParams)
}

async #syncCluster() {
const sessionService = ServiceLocatorContainer.use("SessionService")

await this.#storeCurrentNode(sessionService.sessionsTotal)

await this.#retrieveExistingClusterNodes()
}
Expand Down
Loading

0 comments on commit 795b4af

Please sign in to comment.