Skip to content

Commit

Permalink
Record metrics, Improve response times (#11)
Browse files Browse the repository at this point in the history
* Improve response times

* Fix linter errors

* Measure request durations

* Lint/format

* Change logging format

* Logging fix
  • Loading branch information
anchal00 committed Sep 6, 2024
1 parent 2ed912c commit adc89ae
Show file tree
Hide file tree
Showing 21 changed files with 220 additions and 96 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[flake8]
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,.venv
max-line-length = 120
extend-ignore = E203
4 changes: 0 additions & 4 deletions INIT

This file was deleted.

12 changes: 2 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,5 @@ To workaround this, you may


#### [1] Running Optimus with Root privileges:

(There might be better ways to do this, but this is the temporary solution available at the moment)

Once the existing resolver is disabled and Port 53 is freed up

1. Copy the script `INIT` to `/usr/local/bin`
2. Rename `INIT` to `optimus_server`
3. Make `optimus_server` executable using `chmod +x /usr/local/bin/optimus_server`
4. Now you can invoke Optimus using this executable from your terminal simply by running `optimus_server`

1. Grant executable permission to script `run` by running `chmod +x run`.
2. Execute `./run` to stop systemd resolver daemon and point your system to use optimus
6 changes: 3 additions & 3 deletions optimus/cli/optimus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from argparse import ArgumentParser

from optimus.__version__ import VERSION
from optimus.optimus_server.udp import run_udp_listener
from optimus.server.udp_listener import run_forever


def main(argv):
DEFAULT_WORKER_THREADS = 10
DEFAULT_WORKER_THREADS = 9
DEFAULT_PORT = 53

arg_parser = ArgumentParser(
Expand All @@ -32,7 +32,7 @@ def main(argv):
arg_parser.add_argument("-v", action="store_true", help="Get version info")
args = arg_parser.parse_args(argv)
if args.r:
run_udp_listener(args.p, args.t)
run_forever(args.p, args.t)
elif args.v:
print(f"Optimus Version: {VERSION}")
else:
Expand Down
4 changes: 2 additions & 2 deletions optimus/dns/models/packet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
from typing import List, Optional, Union

from optimus.dns.models.records import AAAA, NS, A, Record, RecordClass, RecordType
from optimus.dns.models.records import AAAA, NS, SOA, A, Record, RecordClass, RecordType


class ResponseCode(Enum): # 4 bits
Expand Down Expand Up @@ -169,7 +169,7 @@ def __init__(
dns_header: DNSHeader,
questions: List[Question],
answers: Optional[List[Record]] = None,
nameserver_records: Optional[List[NS]] = None,
nameserver_records: Optional[List[Union[NS, SOA]]] = None,
additional_records: Optional[List[Union[A, AAAA, NS]]] = None,
) -> None:
self.header = dns_header
Expand Down
8 changes: 7 additions & 1 deletion optimus/dns/models/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ class RecordType(Enum): # 2 bytes
NS = 2 # Name Server : The DNS server address for a domain
CNAME = 5 # Canonical Name : Maps names to names
SOA = 6 # Marks the start of a zone of authority
PTR = 12 # Pointer record for reverse DNS lookups
MX = 15 # Mail exchange : The host of the mail server for a domain
TXT = 16 # Text record, for storing arbitrary text
AAAA = 28 # IPv6 alias : IPv6 address of a host
OPT_RR = 41 # OPT-pseudo RR or meta RR
OPT = 41 # OPT-pseudo RR or meta RR
HTTPS = 65
URI = 265
TEST1 = 65535
TEST2 = 0
UNKNOWN = -1

@classmethod
Expand Down
17 changes: 3 additions & 14 deletions optimus/dns/parser/parse.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
from ipaddress import IPv4Address, IPv6Address
from typing import List, Union

from optimus.dns.parser.iter import BytearrayIterator
from optimus.dns.models.packet import DNSHeader, DNSPacket, Question, ResponseCode
from optimus.dns.models.records import (
AAAA,
CNAME,
MX,
NS,
SOA,
A,
OptPseudoRR,
Record,
RecordClass,
RecordType,
)
from optimus.dns.models.records import AAAA, CNAME, MX, NS, SOA, A, OptPseudoRR, Record, RecordClass, RecordType
from optimus.dns.parser.iter import BytearrayIterator


class DNSParser:
Expand Down Expand Up @@ -199,7 +188,7 @@ def __read_response_records(
self.__to_int(self.__iter.get_n_bytes_and_move(4)),
self.__to_int(self.__iter.get_n_bytes_and_move(4)),
)
elif rtype.value == RecordType.OPT_RR.value:
elif rtype.value == RecordType.OPT.value:
# rclass = self.__iter.get_n_bytes_and_move(2)
# ttl = self.__iter.get_n_bytes_and_move(4)
# length = self.__iter.get_n_bytes_and_move(2)
Expand Down
48 changes: 24 additions & 24 deletions optimus/dns/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,33 @@
from typing import List, Union

from optimus.dns.models.packet import DNSHeader, DNSPacket, Question, ResponseCode
from optimus.dns.parser.parse import DNSParser
from optimus.dns.models.records import AAAA, NS, A, Record, RecordClass, RecordType
from optimus.dns.parser.parse import DNSParser
from optimus.logging.logger import log_error
from optimus.networking.udp import query_server_over_udp

# TODO: Move to a config file and read from file instead of hardcoding
ROOT_SERVERS = [
"198.41.0.4", # a.root-servers.net_ip
"199.9.14.201", # b.root-servers.net_ip
"192.33.4.12", # c.root-servers.net_ip
"199.7.91.13", # d.root-servers.net_ip
"192.203.230.10", # e.root-servers.net_ip
"192.5.5.241", # f.root-servers.net_ip
"192.112.36.4", # g.root-servers.net_ip
"198.97.190.53", # h.root-servers.net_ip
"192.36.148.17", # i.root-servers.net_ip
"192.58.128.30", # j.root-servers.net_ip
"193.0.14.129", # k.root-servers.net_ip
"199.7.83.42", # l.root-servers.net_ip
"202.12.27.33", # m.root-servers.net_ip
]
from optimus.server.context import get_root_servers


# TODO: Improve logging
def resolve(qpacket: DNSPacket) -> DNSPacket:
# Start with first lookup on a random root server
server_addr: str = random.choice(ROOT_SERVERS)
server_addr: str = random.choice(get_root_servers())
while True:
response_packet: DNSPacket = DNSParser(
bytearray(query_server_over_udp(qpacket.to_bin(), server_addr))
).get_dns_packet()
_bytes: bytes = query_server_over_udp(qpacket.to_bin(), server_addr)
# TODO: Implement retries
if not _bytes:
log_error(
f"Resolution of {qpacket.questions[0].name} TYPE {qpacket.questions[0].rtype} ON {server_addr} failed"
)
return DNSPacket(
DNSHeader(
id=qpacket.header.ID,
question_count=qpacket.header.question_count,
response_code=ResponseCode.SERVFAIL,
),
questions=qpacket.questions,
)
response_packet: DNSPacket = DNSParser(bytearray(_bytes)).get_dns_packet()
response_code: ResponseCode = response_packet.header.response_code
# If the server responds with error or if we get the Answer, return the packet as it is
if response_code.value in [
Expand All @@ -53,16 +50,19 @@ def resolve(qpacket: DNSPacket) -> DNSPacket:
return response_packet
# Try to find a 'NS' type record with a corresponding 'A' type record in the additional section
# If found, switch Nameserver and retry the loop i.e perform the lookup on new NameServer again
ns_record_set: set[str] = {ns_rec.nsdname for ns_rec in response_packet.nameserver_records}
ns_records: List[NS] = list(filter(lambda rec: rec.rtype == RecordType.NS, response_packet.nameserver_records))

Check failure on line 53 in optimus/dns/resolver.py

View workflow job for this annotation

GitHub Actions / Build

Argument 1 to "filter" has incompatible type "Callable[[Any], bool]"; expected "Callable[[Union[NS, SOA]], TypeGuard[NS]]" [arg-type]
ns_record_set: set[str] = {ns_rec.nsdname for ns_rec in ns_records}
additional_records: List[Union[A, AAAA, NS]] = response_packet.additional_records
if additional_records:
for ad_rec in additional_records:
if ad_rec.rtype.value == RecordType.A.value and ad_rec.name in ns_record_set:
server_addr = str(ad_rec.address)

Check failure on line 59 in optimus/dns/resolver.py

View workflow job for this annotation

GitHub Actions / Build

Item "NS" of "Union[A, AAAA, NS]" has no attribute "address" [union-attr]
break
else:
if not ns_records:
return response_packet
# Pick a random NS record and perform lookup for that
ns_record: NS = random.choice(response_packet.nameserver_records)
ns_record: NS = random.choice(ns_records)
packet: DNSPacket = resolve(
DNSPacket(
dns_header=DNSHeader(
Expand Down
2 changes: 1 addition & 1 deletion optimus/logging/logger.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logging.basicConfig(level=logging.INFO, format="ts=%(asctime)s level=%(levelname)s message=%(message)s")


# TODO: Improve logging
Expand Down
32 changes: 32 additions & 0 deletions optimus/networking/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import socket
from typing import Optional


class SocketCacheMeta(type):
__INSTANCE: dict[str, "SocketCache"] = dict()

def __new__(cls, cls_name, cls_bases, cls_attrs):
if cls_name not in cls.__INSTANCE:
cls.__INSTANCE[cls_name] = type(cls_name, cls_bases, cls_attrs)
return cls.__INSTANCE[cls_name]


class SocketCache(metaclass=SocketCacheMeta):
def __init__(
self,
) -> None:
self.cache: dict[str, socket.socket] = dict()

def put(self, server_addr: str, sock: socket.socket) -> None:
self.cache[server_addr] = sock

def get(self, server_addr: str) -> Optional[socket.socket]:
return self.cache.get(server_addr)

def delete(self, server_addr: str) -> None:
if server_addr not in self.cache:
return
del self.cache[server_addr]


socket_cache = SocketCache()
28 changes: 13 additions & 15 deletions optimus/networking/udp.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import socket
from typing import Optional

from optimus.logging.logger import log_error
from optimus.networking.cache import socket_cache


def query_server_over_udp(bin_data: bytearray, server_addr: str) -> bytes:
"""
Connects to given `server_addr` over UDP on port 53, sends given `bin_data`
and returns back the response
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
sock.connect((server_addr, 53))
sock: Optional[socket.socket] = socket_cache.get(server_addr)
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
sock.connect((server_addr, 53))
socket_cache.put(server_addr, sock)
sock.send(bin_data)
packet_bytes = sock.recv(600) # Read 600 bytes only for now
packet_bytes = sock.recv(600)
return packet_bytes
except socket.timeout as texc:
except socket.timeout:
log_error(f"Error: Time out, couldn't complete lookup on {server_addr}")
raise Exception("Socket Timeout Error") from texc
except socket.error as err:
return bytes()
except socket.error:
log_error(f"Error: Socket error while connecting to {server_addr}")
raise Exception("Socket Error") from err
finally:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
return bytes()
37 changes: 37 additions & 0 deletions optimus/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from prometheus_client import Counter, Histogram, start_http_server

from optimus.logging.logger import log

inbound_rqc = Counter("inbound_dns_requests", "Total Requests Received")
served_rqc = Counter("served_dns_requests", "Total Requests Processed")
erred_rqc = Counter("erred_dns_requests", "Total Requests Failed")

req_duration_hist = Histogram("duration_dns_request", "Total time taken to process the request")

PORT = 8000


def record_metrics(func):
"""
Requires a running Prometheus Metrics server
TODO: Check whether Prometheus server is up and running on `PORT`
"""

@req_duration_hist.time()
def wrapper(*args, **kwargs):
inbound_rqc.inc(1)
was_success: bool = func(*args, **kwargs)

Check failure on line 23 in optimus/prometheus.py

View workflow job for this annotation

GitHub Actions / Build

By default the bodies of untyped functions are not checked, consider using --check-untyped-defs [annotation-unchecked]
served_rqc.inc(1)
if not was_success:
erred_rqc.inc(1)

return wrapper


def with_prometheus_metrics_server(func):
def wrapper(*args, **kwargs):
start_http_server(PORT)
log(f"Started Prometheus Server on Port {PORT}")
func(*args, **kwargs)

return wrapper
18 changes: 18 additions & 0 deletions optimus/root_servers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"servers": [
"198.41.0.4",
"199.9.14.201",
"192.33.4.12",
"199.7.91.13",
"192.203.230.10",
"192.5.5.241",
"192.112.36.4",
"198.97.190.53",
"192.36.148.17",
"192.58.128.30",
"193.0.14.129",
"199.7.83.42",
"202.12.27.33"
]
}

File renamed without changes.
36 changes: 36 additions & 0 deletions optimus/server/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import json
import os
import pathlib
import posixpath
import socket
from typing import List

__NAMESERVERS: List[str] = []


def get_root_servers():
global __NAMESERVERS
optimus_root = pathlib.Path(os.path.abspath(os.path.dirname(__file__))).parent
if not __NAMESERVERS:
file_path = os.path.join(optimus_root, "root_servers.json")
if not posixpath.exists(file_path):
raise Exception("root servers file not found !")
with open(file_path, "r") as f:
__NAMESERVERS = json.load(f)["servers"]
return __NAMESERVERS


def warmup_cache(cache):
def inner(func):
def wrapper(*args, **kwargs):
addresses = get_root_servers()
for addr in addresses:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
sock.connect((addr, 53))
cache.put(addr, sock)
func(*args, **kwargs)

return wrapper

return inner
14 changes: 10 additions & 4 deletions optimus/optimus_server/router.py → optimus/server/router.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import socket
from optimus.dns.models.packet import DNSPacket

from optimus.dns.models.packet import DNSPacket, ResponseCode
from optimus.dns.parser.parse import DNSParser
from optimus.logging.logger import log
from optimus.dns.resolver import resolve
from optimus.logging.logger import log, log_error
from optimus.prometheus import record_metrics


def handle_request(master_socket: socket.socket, received_bytes: bytes, return_address: tuple[str, int]) -> None:
@record_metrics
def handle_request(master_socket: socket.socket, received_bytes: bytes, return_address: tuple[str, int]) -> bool:
query_packet: DNSPacket = DNSParser(bytearray(received_bytes)).get_dns_packet()
# TODO: Send query for each question in query_packet
log(f"Received query for {query_packet.questions[0].name} TYPE {query_packet.questions[0].rtype}")
response_packet: DNSPacket = resolve(query_packet)
response_packet.header.is_recursion_available = True
master_socket.sendto(response_packet.to_bin(), return_address)
if response_packet.header.response_code != ResponseCode.NOERROR:
log_error(f"Query for {query_packet.questions[0].name} TYPE {query_packet.questions[0].rtype} errored out")
return False
log(f"Query for {query_packet.questions[0].name} TYPE {query_packet.questions[0].rtype} successfully processed")
return True
Loading

0 comments on commit adc89ae

Please sign in to comment.