Skip to content

Commit 53742be

Browse files
emaicusbmcutler
authored andcommitted
Add tutorial 17 - dispatched actions & stdin
1 parent ea871c8 commit 53742be

File tree

5 files changed

+379
-0
lines changed

5 files changed

+379
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
{
2+
"autograding" : {
3+
"work_to_details" : ["**/*.txt"]
4+
},
5+
"docker_enabled" : true,
6+
"testcases" : [
7+
{
8+
// Student-visible testcase name.
9+
"title" : "Docker STDIN Test",
10+
11+
// Commands to run (in order). These are not shell commands, although
12+
// they support some common shell wildcards. This can either be a
13+
// list or a single string.
14+
"containers" : [
15+
{
16+
"commands" : [ "python3 solution.py" ]
17+
},
18+
{
19+
"commands" : [ "python3 solution.py" ]
20+
},
21+
{
22+
"container_name" : "router",
23+
"commands" : [ "python3 router.py" ]
24+
}
25+
],
26+
27+
"dispatcher_actions" : [
28+
{
29+
"action" : "delay",
30+
"seconds" : 2
31+
},
32+
{
33+
"containers" : ["container0"],
34+
"action" : "stdin",
35+
"string" : "Hi there! I'm container0\n"
36+
},
37+
{
38+
"containers" : ["container1"],
39+
"action" : "stdin",
40+
"string" : "Hi there! I'm container1\n"
41+
}
42+
],
43+
44+
// Point value of this testcase.
45+
"points" : 10,
46+
47+
"validation" : [
48+
{
49+
// Grade by "diffing" the student output with an
50+
// instructor-provided file.
51+
"method" : "diff",
52+
// The student's container0 output.
53+
"actual_file" : "container0/STDOUT.txt",
54+
// The title seen by students.
55+
"description" : "Container 0 Output",
56+
// The instructor-provided file (the correct answer).
57+
"expected_file" : "expected_output_0.txt"
58+
},
59+
{
60+
// Grade by "diffing" the student output with an
61+
// instructor-provided file.
62+
"method" : "diff",
63+
// The student's container0 output.
64+
"actual_file" : "container1/STDOUT.txt",
65+
// The title seen by students.
66+
"description" : "Container 1 Output",
67+
// The instructor-provided file (the correct answer).
68+
"expected_file" : "expected_output_1.txt"
69+
}
70+
]
71+
}
72+
]
73+
}
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
import socket
2+
import sys
3+
import csv
4+
import traceback
5+
import queue
6+
import datetime
7+
import errno
8+
from time import sleep
9+
import os
10+
11+
LOG_FILE = 'router_log.txt'
12+
13+
'''
14+
SWITCHBOARD is a dict of the form
15+
{
16+
PORT_NUMBER : {
17+
sender : HOST_NAME
18+
recipient : HOST_NAME
19+
}
20+
...
21+
}
22+
'''
23+
SWITCHBOARD = {}
24+
25+
#PORTS contains a list of all ports that we have to listen at.
26+
PORTS = list()
27+
28+
# A queue determining message processing order.
29+
QUEUE = queue.PriorityQueue()
30+
# Global control variable
31+
RUNNING = True
32+
33+
34+
##################################################################################################################
35+
# HELPER FUNCTIONS
36+
##################################################################################################################
37+
def convert_queue_obj_to_string(obj):
38+
str = '\tSENDER: {0}\n\tRECIPIENT: {1}\n\tPORT: {2}\n\tCONTENT: {3}'.format(obj['sender'], obj['recipient'], obj['port'], obj['message'])
39+
return str
40+
41+
def log(line):
42+
if os.path.exists(LOG_FILE):
43+
append_write = 'a' # append if already exists
44+
else:
45+
append_write = 'w' # make a new file if not
46+
with open(LOG_FILE, mode=append_write) as out_file:
47+
out_file.write(line + '\n')
48+
out_file.flush()
49+
print(line)
50+
sys.stdout.flush()
51+
52+
53+
#knownhosts_tcp.csv and knownhosts_udp.csv are of the form
54+
#sender,recipient,port_number
55+
# such that sender sends all communications to recipient via port_number.
56+
def build_switchboard():
57+
try:
58+
#Read the known_hosts.csv see the top of the file for the specification
59+
for connection_type in ["tcp", "udp"]:
60+
filename = 'knownhosts_{0}.csv'.format(connection_type)
61+
with open(filename, 'r') as infile:
62+
reader = csv.reader(infile)
63+
for sender, recipient, port in reader:
64+
#Strip away trailing or leading whitespace
65+
sender = '{0}_Actual'.format(sender.strip())
66+
recipient = '{0}_Actual'.format(recipient.strip())
67+
port = port.strip()
68+
69+
if not port in PORTS:
70+
PORTS.append(port)
71+
else:
72+
raise SystemExit("ERROR: port {0} was encountered twice. Please keep all ports independant.".format(port))
73+
74+
SWITCHBOARD[port] = {}
75+
SWITCHBOARD[port]['connection_type'] = connection_type
76+
SWITCHBOARD[port]['sender'] = sender
77+
SWITCHBOARD[port]['recipient'] = recipient
78+
SWITCHBOARD[port]['connected'] = False
79+
SWITCHBOARD[port]['connection'] = None
80+
81+
82+
except IOError as e:
83+
log("ERROR: Could not read {0}.".format(filename))
84+
log(traceback.format_exc())
85+
except ValueError as e:
86+
log("ERROR: {0} was improperly formatted. Please include lines of the form (SENDER, RECIPIENT, PORT)".format(filename))
87+
except Exception as e:
88+
log('Encountered an error while reading and parsing {0}'.format(filename))
89+
log(traceback.format_exc())
90+
91+
92+
##################################################################################################################
93+
# OUTGOING CONNECTION/QUEUE FUNCTIONS
94+
##################################################################################################################
95+
96+
97+
def connect_outgoing_socket(port):
98+
if SWITCHBOARD[port]['connected']:
99+
return
100+
101+
connection_type = SWITCHBOARD[port]["connection_type"]
102+
103+
recipient = SWITCHBOARD[port]['recipient']
104+
server_address = (recipient, int(port))
105+
106+
if connection_type == 'tcp':
107+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
108+
sock.connect(server_address)
109+
else:
110+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
111+
112+
#We catch errors one level up.
113+
name = recipient.replace('_Actual', '')
114+
log("Established outgoing connection to {0} on port {1}".format(name, port))
115+
SWITCHBOARD[port]['connected'] = True
116+
SWITCHBOARD[port]['outgoing_socket'] = sock
117+
118+
def send_outgoing_message(data):
119+
try:
120+
port = data['port']
121+
message = data['message']
122+
sock = SWITCHBOARD[port]['outgoing_socket']
123+
recipient = data['recipient']
124+
except:
125+
log("An error occurred internal to the router. Please report the following error to a Submitty Administrator")
126+
log(traceback.format_exc())
127+
try:
128+
if SWITCHBOARD[port]['connection_type'] == 'tcp':
129+
sock.sendall(message)
130+
else:
131+
destination_address = (recipient, int(port))
132+
sock.sendto(message,destination_address)
133+
log('Sent message {!r} to {}'.format(message,recipient.replace('_Actual', '')))
134+
except:
135+
log('Could not deliver message {!r} to {}'.format(message,recipient))
136+
SWITCHBOARD[port]['connected'] = False
137+
SWITCHBOARD[port]['connection'].close()
138+
SWITCHBOARD[port]['connection'] = None
139+
140+
def process_queue():
141+
still_going = True
142+
while still_going:
143+
try:
144+
now = datetime.datetime.now()
145+
#priority queue has no peek function due to threading issues.
146+
# as a result, pull it off, check it, then put it back on.
147+
value = QUEUE.get_nowait()
148+
if value[0] <= now:
149+
send_outgoing_message(value[1])
150+
else:
151+
QUEUE.put(value)
152+
still_going = False
153+
except queue.Empty:
154+
still_going = False
155+
156+
157+
##################################################################################################################
158+
# INCOMING CONNECTION FUNCTIONS
159+
##################################################################################################################
160+
161+
def connect_incoming_sockets():
162+
for port in PORTS:
163+
open_incoming_socket(port)
164+
165+
def open_incoming_socket(port):
166+
# Create a TCP/IP socket
167+
168+
connection_type = SWITCHBOARD[port]['connection_type']
169+
sender = SWITCHBOARD[port]['sender']
170+
171+
if connection_type == "tcp":
172+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
173+
elif connection_type == "udp":
174+
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
175+
else:
176+
log("ERROR: bad connection type {0}. Please contact an administrator".format(connection_type))
177+
sys.exit(1)
178+
179+
#Bind the socket to the port
180+
server_address = ('', int(port))
181+
sock.bind(server_address)
182+
sock.setblocking(False)
183+
184+
log('Bound socket port {0}'.format(port))
185+
186+
if connection_type == 'tcp':
187+
#listen for at most 1 incoming connections at a time.
188+
sock.listen(1)
189+
190+
SWITCHBOARD[port]['incoming_socket'] = sock
191+
192+
if connection_type == 'udp':
193+
SWITCHBOARD[port]['connection'] = sock
194+
195+
def listen_to_sockets():
196+
for port in PORTS:
197+
198+
try:
199+
connection_type = SWITCHBOARD[port]["connection_type"]
200+
if connection_type == 'tcp':
201+
if SWITCHBOARD[port]["connection"] == None:
202+
sock = SWITCHBOARD[port]['incoming_socket']
203+
connection, client_address = sock.accept()
204+
connection.setblocking(False)
205+
SWITCHBOARD[port]['connection'] = connection
206+
name = SWITCHBOARD[port]['sender'].replace('_Actual', '')
207+
log('established connection with {0} on port {1}'.format(name, port))
208+
else:
209+
connection = SWITCHBOARD[port]['connection']
210+
elif connection_type == 'udp':
211+
connection = SWITCHBOARD[port]["connection"]
212+
else:
213+
log('Invalid connection type {0}. Please contact an administrator with this error.'.format(connection_type))
214+
sys.exit(1)
215+
216+
#TODO: May have to the max recvfrom size.
217+
#The recvfrom call will raise a OSError if there is nothing to recieve.
218+
message, snd = connection.recvfrom(4096)
219+
sender = SWITCHBOARD[port]['sender'].replace("_Actual", "")
220+
221+
if message.decode('utf-8') == '' and connection_type == 'tcp':
222+
log('Host {0} disconnected on port {1}.'.format(sender,port))
223+
SWITCHBOARD[port]['connected'] = False
224+
SWITCHBOARD[port]['connection'].close()
225+
SWITCHBOARD[port]['connection'] = None
226+
continue
227+
228+
log('Recieved message {!r} from {} on port {}'.format(message,sender,port))
229+
230+
#if we did not error:
231+
connect_outgoing_socket(port)
232+
recipient = SWITCHBOARD[port]['recipient']
233+
234+
data = {
235+
'sender' : sender,
236+
'recipient' : recipient,
237+
'port' : port,
238+
'message' : message
239+
}
240+
241+
#TODO allow rules to change what time is used for the priority queue.
242+
currentTime = datetime.datetime.now()
243+
tup = (currentTime, data)
244+
QUEUE.put(tup)
245+
except socket.timeout as e:
246+
#This is likely an acceptable error caused by non-blocking sockets having nothing to read.
247+
err = e.args[0]
248+
if err == 'timed out':
249+
log('no data')
250+
else:
251+
log('real error!')
252+
log(traceback.format_exc())
253+
except BlockingIOError as e:
254+
pass
255+
except ConnectionRefusedError as e:
256+
#this means that connect_outgoing_tcp didn't work.
257+
log('Connection on outgoing channel not established. Message dropped.')
258+
log(traceback.format_exc())
259+
SWITCHBOARD[port]['connected'] = False
260+
except socket.gaierror as e:
261+
log("Unable to connect to unknown/not set up entity.")
262+
log(traceback.format_exc())
263+
except Exception as e:
264+
log("ERROR: error listening to socket {0}".format(port))
265+
log(traceback.format_exc())
266+
267+
268+
##################################################################################################################
269+
# CONTROL FUNCTIONS
270+
##################################################################################################################
271+
272+
273+
#Do everything that should happen before multiprocessing kicks in.
274+
def init():
275+
log('Booting up the router...')
276+
build_switchboard()
277+
#Only supporting tcp at the moment.
278+
log('Connecting incoming sockets...')
279+
connect_incoming_sockets()
280+
281+
def run():
282+
running = True
283+
sleep(1)
284+
log('Listening for incoming connections...')
285+
while RUNNING:
286+
listen_to_sockets()
287+
process_queue()
288+
289+
290+
if __name__ == '__main__':
291+
init()
292+
run()
293+
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Waiting on standard input...
2+
received 'Hi there! I'm container0'
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Waiting on standard input...
2+
received 'Hi there! I'm container1'
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env python3
2+
import sys
3+
4+
5+
print('Waiting on standard input...')
6+
sys.stdout.flush()
7+
a = input('')
8+
print("received '{0}'".format(a))
9+
sys.stdout.flush()

0 commit comments

Comments
 (0)