-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdns_server.py
More file actions
83 lines (67 loc) · 2.7 KB
/
dns_server.py
File metadata and controls
83 lines (67 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from ipaddress import IPv4Address, IPv6Address
from dnslib import A, AAAA, QTYPE, RR
from dnslib.server import BaseResolver, DNSServer
def parse_dynamic_ipv4(label):
parts = label.split("-")
if len(parts) != 4 or not all(part.isdigit() for part in parts):
return None
try:
return str(IPv4Address(".".join(parts)))
except ValueError:
return None
def parse_dynamic_ipv6(label):
# IPv6 label format:
# 1) full hextets: 2001-db8-12-34-0-0-0-1 -> 2001:db8:12:34:0:0:0:1
# 2) compressed: 2001-db8--1 -> 2001:db8::1 (double hyphen maps to ::)
if "--" in label:
if label.count("--") > 1:
return None
candidate = label.replace("--", "::", 1).replace("-", ":")
else:
candidate = label.replace("-", ":")
try:
return str(IPv6Address(candidate))
except ValueError:
return None
class DynamicResolver(BaseResolver):
def resolve(self, request, handler):
qname = str(request.q.qname).lower().strip(".")
reply = request.reply()
qtype = request.q.qtype
base_domain = "flares.cloud"
if qname.endswith(".ip." + base_domain):
ipv4 = parse_dynamic_ipv4(qname.split(".")[0])
if not ipv4:
reply.header.rcode = 3 # NXDOMAIN
elif qtype == QTYPE.A:
reply.add_answer(RR(qname, QTYPE.A, rdata=A(ipv4), ttl=300))
else:
reply.header.rcode = 0
elif qname.endswith(".ip6." + base_domain) or qname.endswith(".ipv6." + base_domain):
ipv6 = parse_dynamic_ipv6(qname.split(".")[0])
if not ipv6:
reply.header.rcode = 3 # NXDOMAIN
elif qtype == QTYPE.AAAA:
reply.add_answer(RR(qname, QTYPE.AAAA, rdata=AAAA(ipv6), ttl=300))
else:
reply.header.rcode = 0
elif qname.endswith(base_domain):
if qtype == QTYPE.A:
reply.add_answer(RR(qname, QTYPE.A, rdata=A("104.21.55.142"), ttl=300))
reply.add_answer(RR(qname, QTYPE.A, rdata=A("172.67.149.32"), ttl=300))
elif qtype == QTYPE.AAAA:
reply.add_answer(RR(qname, QTYPE.AAAA, rdata=AAAA("2606:4700:3034::ac43:9520"), ttl=300))
reply.add_answer(RR(qname, QTYPE.AAAA, rdata=AAAA("2606:4700:3030::6815:378e"), ttl=300))
else:
reply.header.rcode = 0
else:
reply.header.rcode = 3 # NXDOMAIN
return reply
if __name__ == "__main__":
resolver = DynamicResolver()
server = DNSServer(resolver, port=53, address="::")
print("Starting DNS server")
try:
server.start()
except KeyboardInterrupt:
server.stop()