LogDB backend using MongoDB and fastAPI
Our application consists of 5 collections
- HDFS
- admins
- blocks
- upvotes
- web_logs
The collections contains the logs for HDFS DataXceiver and HDFS FS Namesystem logs witch are migrated from DataXCeiver_migration.py and namesystem_migration.py python scripts.The structure of the documents is the following:
- _id
- timestamp
- block_id
- source_ip
- destination_ip
- size
- type
The collection contains information about the admins of the application.The data are created using faker in admin_creation.py.The structure of the document is the following:
- _id
- username
- phone
The collection contains information about the blocks of the db as the store of them inside HDFS may increase the size of HDFS documents.The migration of the data is happening in DataXCeiver_migration.py and namesystem_migration.py.The structure of the document contains the following information:
- _id
- block_id
This collection contains information about the upvotes of each administrator.The data for upvotes are produced in admin_data.py using faker.The script is producing the data in respect to every adminstirator must have max 1000 upvotesa and 1/3 of the logs must have at least one upvote.The upvotes document contains the following information:
- _id
- admin_id
- log_id
- log_type
- upvoted_at
This collection contains information regarding logs on web requests therefore the following information is stored:
- _id
- remote_host
- remote_logname
- remote_user
- timestamp
- method
- path
- status
- size
- size
- referer
- user_agent
In order speed up queries we created the following indexes:
The queries on HDFS collection are requesting information regarding the timestamp therefore the following index is created:
db.HDFS.createIndex({ timestamp: 1 }).Given that the second query requests information regarding log type and timestamp we need to create following index db.HDFS.createIndex({ type: 1, timestamp: 1 }).For the most common logs per source ip we need to create the following index db.HDFS.createIndex({ source_ip: 1 }).
As with HDFS collection there is requested information regarding the timestamp so we need to create the following index
db.web_logs.createIndex({ method: 1, timestamp: 1 })
Furthermore for the least common methods we need to create the following index on methods and timestamp
db.web_logs.createIndex({ method: 1, timestamp: 1 })
For the queries that lead to more than one resource we need the following index
db.web_logs.createIndex({ referer: 1 })
In order to find the blocks that were replicated and served the same day we need the following index
db.HDFS.createIndex({ block_id: 1, timestamp: 1 })
And a index on block id for the upvotes queries:
db.blocks.createIndex({ block_id: 1 })
For admins we need to have one username per admins therefore we create the following index
db.admins.createIndex({ username: 1 }, { unique: true })
First of all we should not approve duplicate votes therefore we created the following index
db.upvotes.createIndex({ admin_id: 1, log_id: 1 },{ unique: true })
For the most upvotes logs we need the following index
db.upvotes.createIndex({ upvoted_at: 1, log_id: 1 })
For queries based on admin activity we need the following index
db.upvotes.createIndex({ admin_id: 1 })
For the upvotes that correlate the admin and the source ip we need the following indexes
db.upvotes.createIndex({ admin_id: 1, log_type: 1, log_id: 1 })
db.HDFS.createIndex({ _id: 1, source_ip: 1 })
All indexes can be seen in create_indexes file.
Furthermore the user can insert logs and upvotes
Make sure to activate venn mongoDB and rin with
python -m uvicorn app.main:app --reload
Find the total logs per type that were created within a specified time range and sort them in a descending order
db.HDFS.aggregate([
// -----------------------------
// HDFS logs
// -----------------------------
{
$match: {
timestamp: {
$gte: ISODate("2005-01-01T00:00:00Z"),
$lte: ISODate("2010-01-01T00:00:00Z")
}
}
},
{
$project: {
type: 1
}
},
// -----------------------------
// UNION web logs as ONE type
// -----------------------------
{
$unionWith: {
coll: "web_logs",
pipeline: [
{
$match: {
timestamp: {
$gte: ISODate("2005-01-01T00:00:00Z"),
$lte: ISODate("2010-01-01T00:00:00Z")
}
}
},
{
$project: {
type: { $literal: "WEB" }
}
}
]
}
},
// -----------------------------
// Group & sort
// -----------------------------
{
$group: {
_id: "$type",
totalLogs: { $sum: 1 }
}
},
{
$sort: { totalLogs: -1 }
}
])
In order to call the corresponding api you simply invoke curl -X 'GET' \ 'http://127.0.0.1:8000/logs/logs/logsPerTypeByDateDesc?from=2000-11-09T20%3A30%3A00Z&to=2015-11-09T20%3A30%3A00Z' \ -H 'accept: application/json'
The results are the following:

db.HDFS.aggregate([
{
$match: {
type: "receiving",
timestamp: {
$gte: ISODate("2008-01-01T00:00:00Z"),
$lte: ISODate("2008-12-31T23:59:59Z")
}
}
},
{
$project: {
timestamp: 1
}
},
{
$unionWith: {
coll: "web_logs",
pipeline: [
{
$match: {
timestamp: {
$gte: ISODate("2008-01-01T00:00:00Z"),
$lte: ISODate("2008-12-31T23:59:59Z")
}
}
},
{
$project: {
timestamp: 1
}
}
]
}
},
{
$group: {
_id: {
day: {
$dateToString: {
format: "%Y-%m-%d",
date: "$timestamp"
}
}
},
totalRequests: { $sum: 1 }
}
},
{
$sort: { "_id.day": 1 }
}
])
In order to call the corresponding api you simply invoke curl -X 'GET' \ 'http://127.0.0.1:8000/logs/logs/logsPerDay?from=2008-01-01T00%3A00%3A00Z&to=2008-12-31T23%3A59%3A59Z' \ -H 'accept: application/json'
Example results is the following:

db.HDFS.aggregate([
// -----------------------
// Match specific day
// -----------------------
{
$match: {
source_ip: { $ne: null },
timestamp: {
$gte: ISODate("2008-11-09T00:00:00Z"),
$lt: ISODate("2008-11-10T00:00:00Z")
}
}
},
// -----------------------
// Count per source IP + type
// -----------------------
{
$group: {
_id: {
source_ip: "$source_ip",
type: "$type"
},
total: { $sum: 1 }
}
},
// -----------------------
// Sort by frequency
// -----------------------
{
$sort: { total: -1 }
},
// -----------------------
// Group per source IP
// -----------------------
{
$group: {
_id: "$_id.source_ip",
logs: {
$push: {
type: "$_id.type",
total: "$total"
}
}
}
},
// -----------------------
// Keep top 3 per source IP
// -----------------------
{
$project: {
logs: { $slice: ["$logs", 3] }
}
},
// -----------------------
// Sort IPs (optional)
// -----------------------
{
$sort: { _id: 1 }
}
])
In order to call the corresponding api you simply invoke curl -X 'GET' \ 'http://127.0.0.1:8000/logs/logs/top3logspersource?day=2008-11-09T00%3A00%3A00Z' \ -H 'accept: application/json'
Example result query answer:

db.web_logs.aggregate([
{
$match: {
timestamp: {
$gte: ISODate("2001-01-01T00:00:00Z"),
$lte: ISODate("2015-12-31T23:59:59Z")
}
}
},
{
$group: {
_id: "$method",
totalRequests: { $sum: 1 }
}
},
{
$sort: { totalRequests: 1 }
},
{
$limit: 2
}
])
In order to call the corresponding api you simply invoke curl -X 'GET' \ 'http://127.0.0.1:8000/logs/logs/least2CommonhttpMethods?from=2001-01-01T00%3A00%3A00Z&to=2015-01-01T00%3A00%3A00Z' \ -H 'accept: application/json'

db.web_logs.aggregate([
{
$match: {
referer: { $nin: ["-", null, ""] }
}
},
{
$group: {
_id: "$referer",
resources: { $addToSet: "$path" }
}
},
{
$project: {
resourceCount: { $size: "$resources" }
}
},
{
$match: {
resourceCount: { $gt: 1 }
}
},
{
$sort: { resourceCount: -1 }
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/referersWithMultipleResources' \
-H 'accept: application/json'
db.HDFS.aggregate([
{
$match: {
type: { $in: ["receiving", "received", "served"] }
}
},
{
$addFields: {
day: {
$dateToString: {
format: "%Y-%m-%d",
date: "$timestamp"
}
}
}
},
{
$group: {
_id: {
block_id: "$block_id",
day: "$day"
},
types: { $addToSet: "$type" }
}
},
{
$match: {
types: {
$all: ["served"],
$in: ["receiving", "received"]
}
}
},
{
$project: {
_id: 0,
block_id: "$_id.block_id",
day: "$_id.day",
events: "$types"
}
},
{
$sort: {
day: 1
}
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/blocksReplicatedAndServedTheSameDay' \
-H 'accept: application/json'
db.upvotes.aggregate([
{
$match: {
upvoted_at: {
$gte: ISODate("2026-01-19T00:00:00Z"),
$lt: ISODate("2026-01-20T00:00:00Z")
}
}
},
{
$group: {
_id: "$log_id",
upvotes: { $sum: 1 }
}
},
{ $sort: { upvotes: -1 } },
{ $limit: 50 }
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/top50UpvotedLogsForDay?date=2026-01-19T00:00:00' \
-H 'accept: application/json'
db.upvotes.aggregate([
{
$group: {
_id: "$admin_id",
totalUpvotes: { $sum: 1 }
}
},
{
$sort: { totalUpvotes: -1 }
},
{
$limit: 50
},
{
$lookup: {
from: "admins",
localField: "_id",
foreignField: "_id",
as: "admin"
}
},
{
$unwind: "$admin"
},
{
$project: {
_id: 0,
admin_id: "$_id",
username: "$admin.username",
email: "$admin.email",
phone: "$admin.phone",
totalUpvotes: 1
}
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/top50ActiveAdmins' \
-H 'accept: application/json'
Find the top fifty administrators, with regard to the total number of source IPs for which they have upvoted logs.
db.upvotes.aggregate([
// Join upvotes with logs based on log_type
{
$lookup: {
from: "HDFS", // or "web_logs"
localField: "log_id",
foreignField: "_id",
as: "log"
}
},
{ $unwind: "$log" },
// Group by admin_id and collect unique source IPs
{
$group: {
_id: "$admin_id",
uniqueSourceIPs: { $addToSet: "$log.source_ip" }
}
},
// Count the number of unique IPs per admin
{
$project: {
totalSourceIPs: { $size: "$uniqueSourceIPs" }
}
},
// Sort descending
{ $sort: { totalSourceIPs: -1 } },
// Limit to top 50
{ $limit: 50 },
// Lookup admin metadata
{
$lookup: {
from: "admins",
localField: "_id",
foreignField: "_id",
as: "admin"
}
},
{ $unwind: "$admin" },
// Project final fields
{
$project: {
_id: 0,
admin_id: "$_id",
username: "$admin.username",
email: "$admin.email",
phone: "$admin.phone",
totalSourceIPs: 1
}
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/top50AdminsBySourceIPs' \
-H 'accept: application/json'
Find all logs for which the same e-mail has been used for more than one usernames when casting an upvote.
db.upvotes.aggregate([
// Join upvotes - admins
{
$lookup: {
from: "admins",
localField: "admin_id",
foreignField: "_id",
as: "admin"
}
},
{ $unwind: "$admin" },
// Group by log + email
{
$group: {
_id: {
log_id: "$log_id",
email: "$admin.email"
},
usernames: { $addToSet: "$admin.username" }
}
},
// Count distinct usernames
{
$project: {
usernames: 1,
usernameCount: { $size: "$usernames" }
}
},
// Keep only emails used by >1 username
{
$match: {
usernameCount: { $gt: 1 }
}
},
// Clean output
{
$project: {
_id: 0,
log_id: { $toString: "$_id.log_id" },
email: "$_id.email",
usernames: 1,
usernameCount: 1
}
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/logsWithSameEmailButDifferentUsernames' \
-H 'accept: application/json'
As we do not have any admins with the same email and different usernames the results are empty.
db.upvotes.aggregate([
// Join admins to get username
{
$lookup: {
from: "admins",
localField: "admin_id",
foreignField: "_id",
as: "admin"
}
},
{ $unwind: "$admin" },
// Filter by admin username
{
$match: {
"admin.username": "admin_username_here",
"log_type": "hdfs"
}
},
// Join HDFS logs
{
$lookup: {
from: "HDFS",
localField: "log_id",
foreignField: "_id",
as: "hdfs_log"
}
},
{ $unwind: "$hdfs_log" },
// Extract block_id
{
$group: {
_id: "$hdfs_log.block_id"
}
},
// Pretty output
{
$project: {
_id: 0,
block_id: "$_id"
}
}
])
In order to call the corresponding api you simply invoke
curl -X 'GET' \
'http://127.0.0.1:8000/logs/logs/blocksUpvotedByAdmin/dennisanna' \
-H 'accept: application/json'





