diff --git a/.gitignore b/.gitignore index be2afa23..ec6361f7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,9 @@ vmms/id_rsa* courselabs/* # config config.py +output_gen +.gitignore + # Virtualenv .Python diff --git a/Dockerfile b/Dockerfile index f2f3c4eb..f0b989b6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,26 +1,19 @@ # Start with empty ubuntu machine -FROM ubuntu:15.04 +FROM ubuntu MAINTAINER Autolab Development Team "autolab-dev@andrew.cmu.edu" # Setup correct environment variable ENV HOME /root -# Change to working directory -WORKDIR /opt - -# Move all code into Tango directory -ADD . TangoService/Tango/ -WORKDIR /opt/TangoService/Tango -RUN mkdir volumes - -WORKDIR /opt +RUN mkdir -p /opt/TangoFiles/volumes /opt/TangoFiles/courselabs /opt/TangoFiles/output # Install dependancies RUN apt-get update && apt-get install -y \ nginx \ curl \ git \ + iputils-ping \ vim \ supervisor \ python-pip \ @@ -28,7 +21,7 @@ RUN apt-get update && apt-get install -y \ build-essential \ tcl8.5 \ wget \ - libgcrypt11-dev \ + libgcrypt11-dev \ zlib1g-dev \ apt-transport-https \ ca-certificates \ @@ -38,13 +31,10 @@ RUN apt-get update && apt-get install -y \ && rm -rf /var/lib/apt/lists/* # Install Redis +WORKDIR /opt RUN wget http://download.redis.io/releases/redis-stable.tar.gz && tar xzf redis-stable.tar.gz WORKDIR /opt/redis-stable -RUN make && make install -WORKDIR /opt/TangoService/Tango/ - -# Install Docker from Docker Inc. repositories. -RUN curl -sSL https://get.docker.com/ | sh +RUN make && make install # Install the magic wrapper. ADD ./wrapdocker /usr/local/bin/wrapdocker @@ -53,23 +43,29 @@ RUN chmod +x /usr/local/bin/wrapdocker # Define additional metadata for our image. VOLUME /var/lib/docker -# Create virtualenv to link dependancies +# Install python dependancies +ADD ./requirements.txt /opt/TangoFiles/requirements.txt +WORKDIR /opt/TangoFiles RUN pip install virtualenv && virtualenv . -# Install python dependancies RUN pip install -r requirements.txt +RUN pip install pytz RUN mkdir -p /var/log/docker /var/log/supervisor # Move custom config file to proper location -RUN cp /opt/TangoService/Tango/deployment/config/nginx.conf /etc/nginx/nginx.conf -RUN cp /opt/TangoService/Tango/deployment/config/supervisord.conf /etc/supervisor/supervisord.conf -RUN cp /opt/TangoService/Tango/deployment/config/redis.conf /etc/redis.conf +ADD ./deployment/config/nginx.conf /etc/nginx/nginx.conf +ADD ./deployment/config/supervisord.conf /etc/supervisor/supervisord.conf +ADD ./deployment/config/redis.conf /etc/redis.conf + +#JMB added for EC2 config +ADD ./deployment/config/boto.cfg /etc/boto.cfg +ADD ./deployment/config/746-autograde.pem /root/746-autograde.pem +RUN chmod 600 /root/746-autograde.pem # Reload new config scripts CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"] - -# TODO: +# TODO: # volumes dir in root dir, supervisor only starts after calling start once , nginx also needs to be started # Different log numbers for two different tangos # what from nginx forwards requests to tango diff --git a/autodriver/Makefile b/autodriver/Makefile index 203ca231..9fde9293 100644 --- a/autodriver/Makefile +++ b/autodriver/Makefile @@ -1,5 +1,6 @@ CC = gcc CFLAGS = -W -Wall -Wextra +LDFLAGS = -pthread OBJS = autodriver.o diff --git a/autodriver/README b/autodriver/README new file mode 100644 index 00000000..999458df --- /dev/null +++ b/autodriver/README @@ -0,0 +1,25 @@ +To build a grading vm image for Autolab jobs: + +Create a vm with a stock linux image +Copy autodriver.c and Makefile to the vm and compile it to autodriver +Copy autodriver to any common path, make it owned by root wtih setuid bits. +For example: -rwsr-sr-x 1 root root /usr/bin/autodriver + +Create the following users +autolab: The ssh/scp user tied with selected key pair of you cloud account +autograde: The user to run TA's grader starting from the top Makefile (see autodriver.c) +student: For student to use the exact image for coding/testing + +The sequence of grading using the above image is such: + +The grading engine: scp top level Makefile, autograde.tar (both made by course staff) +and student's submission to the grading vm. + +The grading engine: ssh to run autodriver program. + +The greating vm: autodriver program (running as root because of the setuid bit) starts +a child process (running as user autograde) to run "make" with top level Makefile. + +The grading engine: scp the output file from the grading vm. + + diff --git a/autodriver/autodriver.c b/autodriver/autodriver.c index e4f05a60..3dea9cb3 100644 --- a/autodriver/autodriver.c +++ b/autodriver/autodriver.c @@ -36,16 +36,52 @@ #include #include #include +#include +#include + +// How autodriver works: +// +// The parent process creates an output file and starts a child process run_job(). +// The child process assumes under the home directory of the user "autograde" +// there is a directory specified on the command line of this program. +// Under that directory, there is a Makefile. +// The child will run the Makefile to start the tests and redirects all output +// to the output file created by the parent process. +// +// After the child process terminates, the parent parses the output file and +// sends the content to stdout, in dump_output() and dump_file(). If the +// output file is too large, it's elided in the middle. If timestamp +// option (-i) is specified, timestamps are inserted into the output stream. +// +// If timestamp option is set: The parent starts a thread timestampFunc() after +// starting the child process. The thread records at the given interval the +// timestamps (output file size AND time). While parsing the output file +// after the child process, the recorded timestamps are inserted at the offsets +// by insertTimestamp(). #define min(x, y) ((x) < (y) ? (x) : (y)) -#define OUTPUT_HEADER "Autodriver: " +char timestampStr[100]; +char * getTimestamp(time_t t) { + time_t ltime = t ? t : time(NULL); + struct tm* tmInfo = localtime(<ime); + strftime(timestampStr, 100, "%Y%m%d-%H:%M:%S", tmInfo); + return timestampStr; // return global variable for conveniece +} + +#define ERROR_ERRNO(format, ...) \ + printf("Autodriver@%s: ERROR " format " at line %d: %s\n", \ + getTimestamp(0), ##__VA_ARGS__, __LINE__, strerror(errno)) -#define ERROR_ERRNO(msg) \ - printf(OUTPUT_HEADER "%s at line %d: %s\n", msg, __LINE__, strerror(errno)) +#define ERROR(format, ...) \ + printf("Autodriver@%s: ERROR " format " at line %d\n", \ + getTimestamp(0), ##__VA_ARGS__, __LINE__) -#define ERROR(msg) \ - printf(OUTPUT_HEADER "%s at line %d\n", msg, __LINE__) +#define MESSAGE(format, ...) \ + printf("Autodriver@%s: " format "\n", getTimestamp(0), ##__VA_ARGS__) + +#define NL_MESSAGE(format, ...) \ + printf("\nAutodriver@%s: " format "\n", getTimestamp(0), ##__VA_ARGS__) #define EXIT__BASE 1 @@ -69,8 +105,6 @@ /* Number of seconds to wait in between pkills */ #define SHUTDOWN_GRACE_TIME 3 -error_t argp_err_exit_status = EXIT_USAGE; - /** * @brief A structure containing all of the user-configurable settings */ @@ -82,8 +116,27 @@ struct arguments { struct passwd user_info; char *passwd_buf; char *directory; + char *timezone; + unsigned timestamp_interval; } args; +unsigned long startTime = 0; +int childTimedOut = 0; + +typedef struct { + time_t time; + size_t offset; +} timestamp_map_t; + +#define TIMESTAMP_MAP_CHUNK_SIZE 1024 + +timestamp_map_t *timestampMap = NULL; // remember time@offset of output file +unsigned timestampCount = 0; +int childFinished = 0; + +size_t outputFileSize = 0; +int child_output_fd; // OUTPUT_FILE created/opened by main process, used by child + /** * @brief Parses a string into an unsigned integer. * @@ -154,6 +207,151 @@ static int parse_user(char *name, struct passwd *user_info, char **buf) { return 0; } +// pthread function, keep a map of timestamp and user's output file offset. +// The thread is not started unless timestamp interval option is specified. +void *timestampFunc() { + time_t lastStamp = 0; + int lastJumpIndex = -1; + int output_fd; + + // open output file read only to build timestamp:offset map + if ((output_fd = open(OUTPUT_FILE, O_RDONLY)) < 0) { + ERROR_ERRNO("Opening output file by parent process"); + // don't quit for this type of error + } + + while (1) { + if (childFinished) { + break; + } + + sleep(1); + + // allocate/reallocate space to create/grow the map + if (timestampCount % TIMESTAMP_MAP_CHUNK_SIZE == 0) { + timestamp_map_t *newBuffer = + realloc(timestampMap, + sizeof(timestamp_map_t) * (TIMESTAMP_MAP_CHUNK_SIZE + timestampCount)); + if (!newBuffer){ + ERROR_ERRNO("Failed to allocate timestamp map. Current map size %d", + timestampCount); + continue; // continue without allocation + } + timestampMap = newBuffer; + newBuffer += timestampCount; + memset(newBuffer, 0, sizeof(timestamp_map_t) * TIMESTAMP_MAP_CHUNK_SIZE); + } + + struct stat buf; + if (output_fd <= 0 || fstat(output_fd, &buf) < 0) { + ERROR_ERRNO("Statting output file to read offset"); + continue; // simply skip this time + } + + size_t currentOffset = buf.st_size; + time_t currentTime = time(NULL); + + // record following timestamps: + // 1. enough time has passed since last timestamp or + // 2. output has grown and enough time has passed since last offset change + + if (timestampCount == 0 || + timestampMap[timestampCount - 1].offset != currentOffset) { + if (lastJumpIndex >= 0 && + currentTime - timestampMap[lastJumpIndex].time < args.timestamp_interval) { + continue; + } + lastJumpIndex = timestampCount; + } else if (currentTime - lastStamp < args.timestamp_interval) { + continue; + } + + lastStamp = currentTime; + timestampMap[timestampCount].time = currentTime; + timestampMap[timestampCount].offset = currentOffset; + timestampCount++; + } + + if (output_fd <= 0 || close(output_fd) < 0) { + ERROR_ERRNO("Closing output file before cleanup"); + } + return NULL; +} + +int writeBuffer(char *buffer, size_t nBytes) { // nBytes can be zero (no-op) + ssize_t nwritten = 0; + size_t write_rem = nBytes; + char *write_base = buffer; + + while (write_rem > 0) { + if ((nwritten = write(STDOUT_FILENO, write_base, write_rem)) < 0) { + ERROR_ERRNO("Writing output"); + ERROR("Failure details: write_base %p write_rem %lu", write_base, write_rem); + return -1; + } + write_rem -= nwritten; + write_base += nwritten; + } + return 0; +} + +// Insert the timestamp at the appropriate places. +// When failing to write to the output file, return with updated scanCursor, +void insertTimestamp(char *buffer, + size_t bufferOffset, + size_t bufferLength, + char **scanCursorInOut, + unsigned *currentStampInOut) { + char *scanCursor = *scanCursorInOut; + unsigned currentStamp = *currentStampInOut; + size_t nextOffset = bufferOffset + bufferLength; + size_t eolOffset = 0; + + // pace through timestamps that fall into the buffer + while (currentStamp < timestampCount && + timestampMap[currentStamp].offset < nextOffset) { + + // there might be unused timestamps from last read buffer or before last eol. + // skip over them. + if (timestampMap[currentStamp].offset < bufferOffset || + timestampMap[currentStamp].offset <= eolOffset) { + currentStamp++; + continue; + } + + char *eolSearchStart = buffer + (timestampMap[currentStamp].offset - bufferOffset); + char *nextEol = strchr(eolSearchStart, '\n'); + if (!nextEol) { // no line break found in read buffer to insert timestamp + break; + } + + // write the stuff up to the line break + if (writeBuffer(scanCursor, (nextEol + 1) - scanCursor)) { + ERROR("Write failed: buffer %p cursor %p nextEol %p", buffer, scanCursor, nextEol); + break; + } + scanCursor = nextEol + 1; + + // no timestamp at EOF, because the test scores are on the last line + eolOffset = bufferOffset + (nextEol - buffer); + if (eolOffset + 1 >= outputFileSize) { + break; + } + + // write the timestamp + char stampInsert[300]; + sprintf(stampInsert, + "...[timestamp %s inserted by autodriver at offset ~%lu. Maybe out of sync with output's own timestamps.]...\n", + getTimestamp(timestampMap[currentStamp].time), + timestampMap[currentStamp].offset); + if (writeBuffer(stampInsert, strlen(stampInsert))) {break;} + currentStamp++; + } // while loop through the stamps falling into read buffer's range + + *scanCursorInOut = scanCursor; + *currentStampInOut = currentStamp; +} + /** * @brief Dumps a specified number of bytes from a file to standard out * @@ -164,40 +362,50 @@ static int parse_user(char *name, struct passwd *user_info, char **buf) { * @return 0 on success, -1 on failure */ static int dump_file(int fd, size_t bytes, off_t offset) { - char buffer[BUFSIZE]; - char *write_base; - ssize_t nread, nwritten; - size_t read_rem, write_rem; + static unsigned currentStamp = 0; + size_t read_rem = bytes; + size_t nextOffset = offset; + + if (offset) { // second part of output file, after truncating in the middle + // insert a message, indicating file truncation + char *msg = "\n...[excess bytes elided by autodriver]...\n"; + if (writeBuffer(msg, strlen(msg))) {return -1;} + } // Flush stdout so our writes here don't race with buffer flushes if (fflush(stdout) != 0) { - ERROR_ERRNO("Error flushing standard out"); + ERROR_ERRNO("Flushing standard out"); return -1; } if (lseek(fd, offset, SEEK_SET) < 0) { - ERROR_ERRNO("Error seeking in output file"); + ERROR_ERRNO("Seeking in output file"); return -1; } - read_rem = bytes; while (read_rem > 0) { - if ((nread = read(fd, buffer, min(read_rem, BUFSIZE))) < 0) { - ERROR_ERRNO("Error reading from output file"); - return -1; - } - write_rem = nread; - write_base = buffer; - while (write_rem > 0) { - if ((nwritten = write(STDOUT_FILENO, write_base, write_rem)) < 0) { - ERROR_ERRNO("Error writing output"); - return -1; - } - write_rem -= nwritten; - write_base += nwritten; - } - read_rem -= nread; - } + char buffer[BUFSIZE + 1]; // keep the last byte as string terminator + ssize_t nread; + + memset(buffer, 0, BUFSIZE + 1); + if ((nread = read(fd, buffer, min(read_rem, BUFSIZE))) < 0) { + ERROR_ERRNO("Reading from output file"); + return -1; + } + read_rem -= nread; + char *scanCursor = buffer; + + if (timestampCount) { // If inserting timestamp + insertTimestamp(buffer, nextOffset, nread, &scanCursor, ¤tStamp); + } + + if (writeBuffer(scanCursor, nread - (scanCursor - buffer))) { + ERROR("Write failed: buffer %p cursor %p nread %lu", buffer, scanCursor, nread); + return -1; + } + + nextOffset += nread; // offset of next read buffer in the file + } // while loop finish reading return 0; } @@ -233,6 +441,15 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { "The argument to osize must be a nonnegative integer"); } break; + case 'i': + if (parse_uint(arg, &arguments->timestamp_interval) < 0) { + argp_failure(state, EXIT_USAGE, 0, + "The argument to timestamp-interval must be a nonnegative integer"); + } + break; + case 'z': + args.timezone = arg; + break; case ARGP_KEY_ARG: switch (state->arg_num) { case 0: @@ -290,13 +507,13 @@ static void setup_dir(void) { char *mv_args[] = {"/bin/mv", "-f", args.directory, args.user_info.pw_dir, NULL}; if (call_program("/bin/mv", mv_args) != 0) { - ERROR("Error moving directory"); + ERROR("Moving directory"); exit(EXIT_OSERROR); } // And switch over to that directory if (chdir(args.user_info.pw_dir) < 0) { - ERROR_ERRNO("Error changing directories"); + ERROR_ERRNO("Changing directories"); exit(EXIT_OSERROR); } @@ -305,7 +522,7 @@ static void setup_dir(void) { sprintf(owner, "%d:%d", args.user_info.pw_uid, args.user_info.pw_gid); char *chown_args[] = {"/bin/chown", "-R", owner, args.directory, NULL}; if (call_program("/bin/chown", chown_args) != 0) { - ERROR("Error chowining directory"); + ERROR("Chowning directory"); exit(EXIT_OSERROR); } } @@ -316,23 +533,25 @@ static void setup_dir(void) { static void dump_output(void) { int outfd; if ((outfd = open(OUTPUT_FILE, O_RDONLY)) < 0) { - ERROR_ERRNO("Error opening output file"); + ERROR_ERRNO("Opening output file at the end of test"); exit(EXIT_OSERROR); } struct stat stat; if (fstat(outfd, &stat) < 0) { - ERROR_ERRNO("Error stating output file"); + ERROR_ERRNO("Statting output file"); exit(EXIT_OSERROR); } + outputFileSize = stat.st_size; // Truncate output if we have to if (args.osize > 0 && stat.st_size > args.osize) { + MESSAGE("Output size %lu > limit %u -- will elide in the middle", + stat.st_size, args.osize); unsigned part_size = args.osize / 2; if (dump_file(outfd, part_size, 0) < 0) { exit(EXIT_OSERROR); } - printf("\n...[excess bytes elided]...\n"); if (dump_file(outfd, part_size, stat.st_size - part_size) < 0) { exit(EXIT_OSERROR); } @@ -342,7 +561,7 @@ static void dump_output(void) { } } if (close(outfd) < 0) { - ERROR_ERRNO("Error closing output file"); + ERROR_ERRNO("Closing output file at the end of test"); exit(EXIT_OSERROR); } } @@ -360,8 +579,8 @@ static int kill_processes(char *sig) { GRADING_USER, NULL}; if ((ret = call_program("/usr/bin/pkill", pkill_args)) > 1) { - ERROR("Error killing user processes"); - exit(EXIT_OSERROR); + ERROR("Killing user processes"); + // don't quit. Let the caller decide } return ret; } @@ -381,7 +600,7 @@ static void cleanup(void) { sleep(SHUTDOWN_GRACE_TIME); if (try > MAX_KILL_ATTEMPTS) { ERROR("Gave up killing user processes"); - exit(EXIT_OSERROR); + break; // continue to cleanup with best effort } ret = kill_processes("-KILL"); try++; @@ -394,7 +613,7 @@ static void cleanup(void) { char *find_args[] = {"find", "/usr/bin/find", ".", "/tmp", "/var/tmp", "-user", args.user_info.pw_name, "-delete", NULL}; if (call_program("/usr/bin/env", find_args) != 0) { - ERROR("Error deleting user's files"); + ERROR("Deleting user's files"); exit(EXIT_OSERROR); } } @@ -413,6 +632,15 @@ static int monitor_child(pid_t child) { int killed = 0; int status; + // create a thread to track the file size at given time interval + pthread_t timestampThread = 0; // this thread needs no cancellation + if (args.timestamp_interval > 0) { + if (pthread_create(×tampThread, NULL, timestampFunc, NULL)) { + ERROR_ERRNO("Failed to create timestamp thread"); + exit(EXIT_OSERROR); + } + } + // Handle the timeout if we have to if (args.timeout != 0) { struct timespec timeout; @@ -425,25 +653,37 @@ static int monitor_child(pid_t child) { if (sigtimedwait(&sigset, NULL, &timeout) < 0) { // Child timed out + ERROR("Job timed out after %d seconds", args.timeout); assert(errno == EAGAIN); kill(child, SIGKILL); killed = 1; + childTimedOut = 1; } } if (waitpid(child, &status, 0) < 0) { - ERROR_ERRNO("Error reaping child"); + ERROR_ERRNO("Reaping child"); exit(EXIT_OSERROR); } - if (killed) { - printf(OUTPUT_HEADER "Job timed out after %d seconds\n", args.timeout); - } else { - printf(OUTPUT_HEADER "Job exited with status %d\n", - WEXITSTATUS(status)); + MESSAGE("Test terminates. Duration: %lu seconds", time(NULL) - startTime); + + if (!killed) { + MESSAGE("Job exited with status %d", WEXITSTATUS(status)); + } + + if (args.timestamp_interval > 0) { + MESSAGE("Timestamps inserted at %d-second or larger intervals, depending on output rates", + args.timestamp_interval); } + MESSAGE("Also check end of output for potential errors"); + childFinished = 1; dump_output(); + if (childTimedOut) { + NL_MESSAGE("ERROR Job timed out"); // print error again at the end of output + } + cleanup(); exit(killed ? EXIT_TIMEOUT : EXIT_SUCCESS); } @@ -468,7 +708,7 @@ static void run_job(void) { if (args.nproc != 0) { struct rlimit rlimit = {args.nproc, args.nproc}; if (setrlimit(RLIMIT_NPROC, &rlimit) < 0) { - perror("Error setting process limit"); + perror("Setting process limit"); exit(EXIT_OSERROR); } } @@ -476,61 +716,56 @@ static void run_job(void) { if (args.fsize != 0) { struct rlimit rlimit = {args.fsize, args.fsize}; if (setrlimit(RLIMIT_FSIZE, &rlimit) < 0) { - ERROR_ERRNO("Error setting filesize limit"); + ERROR_ERRNO("Setting filesize limit"); exit(EXIT_OSERROR); } } // Drop permissions if (initgroups(args.user_info.pw_name, args.user_info.pw_gid) < 0) { - ERROR_ERRNO("Error setting supplementary group IDs"); + ERROR_ERRNO("Setting supplementary group IDs"); exit(EXIT_OSERROR); } if (setresgid(args.user_info.pw_gid, args.user_info.pw_gid, args.user_info.pw_gid) < 0) { - ERROR_ERRNO("Error setting group ID"); + ERROR_ERRNO("Setting group ID"); exit(EXIT_OSERROR); } if (setresuid(args.user_info.pw_uid, args.user_info.pw_uid, args.user_info.pw_uid) < 0) { - ERROR_ERRNO("Error setting user ID"); + ERROR_ERRNO("Setting user ID"); exit(EXIT_OSERROR); } // Redirect output - int fd; - if ((fd = open(OUTPUT_FILE, O_WRONLY | O_CREAT | O_TRUNC, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) { - ERROR_ERRNO("Error opening output file"); - exit(EXIT_OSERROR); - } + int fd = child_output_fd; if (dup2(fd, STDOUT_FILENO) < 0) { - ERROR_ERRNO("Error redirecting standard output"); + ERROR_ERRNO("Redirecting standard output"); exit(EXIT_OSERROR); } if (dup2(fd, STDERR_FILENO) < 0) { - ERROR_ERRNO("Error redirecting standard error"); + ERROR_ERRNO("Redirecting standard error"); exit(EXIT_OSERROR); } if (close(fd) < 0) { - ERROR_ERRNO("Error closing output file"); + ERROR_ERRNO("Closing output file by child process"); exit(EXIT_OSERROR); } // Switch into the folder if (chdir(args.directory) < 0) { - ERROR_ERRNO("Error changing directory"); + ERROR_ERRNO("Changing directory"); exit(EXIT_OSERROR); } // Finally exec job execl("/usr/bin/make", "make", NULL); - ERROR_ERRNO("Error executing make"); + ERROR_ERRNO("Eexecuting make"); exit(EXIT_OSERROR); } @@ -540,10 +775,13 @@ int main(int argc, char **argv) { args.fsize = 0; args.timeout = 0; args.osize = 0; + args.timestamp_interval = 0; + args.timezone = NULL; + startTime = time(NULL); // Make sure this isn't being run as root if (getuid() == 0) { - printf(OUTPUT_HEADER "Autodriver should not be run as root.\n"); + ERROR("Autodriver should not be run as root"); exit(EXIT_USAGE); } @@ -566,6 +804,10 @@ int main(int argc, char **argv) { "Limit the amount of time a job is allowed to run (seconds)", 0}, {"osize", 'o', "size", 0, "Limit the amount of output returned (bytes)", 0}, + {"timestamp-interval", 'i', "interval", 0, + "Interval (seconds) for placing timestamps in user output file", 0}, + {"timezone", 'z', "timezone", 0, + "Timezone setting. Default is UTC", 0}, {0, 0, 0, 0, 0, 0} }; @@ -574,6 +816,16 @@ int main(int argc, char **argv) { argp_parse(&parser, argc, argv, 0, NULL, &args); + // set time zone preference: -z argument, TZ environment variable, system wide + if (args.timezone) { + char tz[100]; + strcpy(tz, "TZ="); + strcat(tz, args.timezone); + putenv(tz); + } + tzset(); + MESSAGE("Test Starts. Time zone %s:%s", tzname[0], tzname[1]); + setup_dir(); // Block SIGCHLD to make sure monitor_child recieves it. @@ -582,6 +834,20 @@ int main(int argc, char **argv) { sigaddset(&sigset, SIGCHLD); sigprocmask(SIG_BLOCK, &sigset, NULL); + // output file is written by the child process while running the test. + // It's created here before forking, because the timestamp thread needs + // read access to it. + if ((child_output_fd = open(OUTPUT_FILE, O_WRONLY | O_CREAT | O_TRUNC | O_SYNC, + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0) { + ERROR_ERRNO("Creating output file"); + exit(EXIT_OSERROR); + } + // chown output file to user "autograde" + if (fchown(child_output_fd, args.user_info.pw_uid, args.user_info.pw_gid) < 0) { + ERROR_ERRNO("Error chowning output file"); + exit(EXIT_OSERROR); + } + pid_t pid = fork(); if (pid < 0) { ERROR_ERRNO("Unable to fork"); @@ -589,9 +855,13 @@ int main(int argc, char **argv) { } else if (pid == 0) { run_job(); } else { + if (close(child_output_fd) < 0) { + ERROR_ERRNO("Closing output file by parent process"); + // don't quit for this type of error + } + monitor_child(pid); } return 0; } - diff --git a/autodriver/test/Makefile b/autodriver/test/Makefile index c5859d96..41745a35 100644 --- a/autodriver/test/Makefile +++ b/autodriver/test/Makefile @@ -1,3 +1,17 @@ -autograde: - id - sleep 5 +CC = gcc +CFLAGS = -W -Wall -Wextra + +OBJS = output_gen.o + +all: output_gen run_output_gen + +output_gen: $(OBJS) + $(CC) $(LDFLAGS) -o output_gen $(OBJS) + +clean: + rm -f *.o output_gen + +.PHONY: clean + +run_output_gen: + output_gen diff --git a/autodriver/test/README b/autodriver/test/README new file mode 100644 index 00000000..790e37fd --- /dev/null +++ b/autodriver/test/README @@ -0,0 +1,8 @@ +How to test autodriver: First create a user "autograde" on the test machine. + +cd Tango/autodriver +make +cp -r test tmp +autodriver tmp + + diff --git a/autodriver/test/output_gen.c b/autodriver/test/output_gen.c new file mode 100644 index 00000000..abd81943 --- /dev/null +++ b/autodriver/test/output_gen.c @@ -0,0 +1,39 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include + +int main() { + srand((unsigned)time(NULL)); + putenv("TZ=America/New_York"); + tzset(); + + int i, k; + char timeStr[100]; + for (k = 0; k < 100; k++) { + for (i = 0; i < 200; i++) { + time_t ltime = time(NULL); + struct tm* tmInfo = localtime(<ime); + strftime(timeStr, 100, "%Y%m%d-%H:%M:%S", tmInfo); + printf("TIME: \"%s\" followed by 3 lines of random lenth\n", timeStr); + int j; + for (j = 0; j < 3; j++) { + int lineLength = rand() % 2000; // longer than autodriver's buf size + int count = 0; + char line[81]; + memset(line, 0, 81); + while (count < lineLength) { + line[count] = '0' + count % 10; + count++; + } + printf("%s\n", line); + } + } + sleep(1); + } + sleep(5); + exit(0); +} diff --git a/config.template.py b/config.template.py index c7a92007..2ed3b115 100644 --- a/config.template.py +++ b/config.template.py @@ -23,6 +23,7 @@ class Config: PORT = 3000 # Log file. Setting this to None sends the server output to stdout + # Strongly suggest setting up a log file LOGFILE = None # Logging level @@ -57,6 +58,8 @@ class Config: NUM_THREADS = 20 # We have the option to reuse VMs or discard them after each use + # xxxXXX??? strongly suspect the code path for the False case + # not working, after a failed experiment. -- czang@cmu.edu REUSE_VMS = True # Worker waits this many seconds for functions waitvm, copyin (per @@ -67,6 +70,11 @@ class Config: RUNJOB_TIMEOUT = 60 COPYOUT_TIMEOUT = 30 + # time zone and timestamp report interval for autodriver execution + # both are optional. + AUTODRIVER_LOGGING_TIME_ZONE = "UTC" # e.g. "America/New_York". + AUTODRIVER_TIMESTAMP_INTERVAL = 0 # in seconds. 0 => no timestamp insersion + # Docker constants BOOT2DOCKER_INIT_TIMEOUT = 5 BOOT2DOCKER_START_TIMEOUT = 30 @@ -100,11 +108,20 @@ class Config: # Give VMMS this many seconds to destroy a VM before giving up DESTROY_SECS = 5 + # When set to True, put the vm aside for debugging after OS ERROR by autodriver + KEEP_VM_AFTER_FAILURE = None + # Time to wait between creating VM instances to give DNS time to cool down CREATEVM_SECS = 1 # Default vm pool size - POOL_SIZE = 2 + POOL_SIZE = 10 + + # vm pool reserve size. If set, free pool size is maintained at the level. + POOL_SIZE_LOW_WATER_MARK = 5 # optional, can be None + + # Increment step when enlarging vm pool + POOL_ALLOC_INCREMENT = 2 # can be None, which is treated as 1, the default # Optionally log finer-grained timing information LOG_TIMING = False @@ -134,13 +151,25 @@ class Config: ###### # Part 5: EC2 Constants # + + # Special instructions to admin: Tango finds usable images from aws + # in the following fashion: + # It examines every ami (Amazon Image) owned by the EC2_USER_NAME, + # looks for a tag with the key "Name" (case sensitive), and use the value + # of the tag as the image name for the ami, for example, ubuntu.img or + # myImage.img. If an ami doesn't have such tag, it is ignored (watch + # for a log message). + # + # The lab author, when specifying an image to use, should specify one + # of those image names available. + EC2_REGION = '' EC2_USER_NAME = '' - DEFAULT_AMI = '' + KEEP_VM_AFTER_FAILURE = False DEFAULT_INST_TYPE = '' DEFAULT_SECURITY_GROUP = '' SECURITY_KEY_PATH = '' - DYNAMIC_SECURITY_KEY_PATH = '' + DYNAMIC_SECURITY_KEY_PATH = '' # key file placed at root "/" by default SECURITY_KEY_NAME = '' TANGO_RESERVATION_ID = '' INSTANCE_RUNNING = 16 # Status code of a instance that is running diff --git a/deployment/config/redis.conf b/deployment/config/redis.conf index 6c765691..f6992c06 100644 --- a/deployment/config/redis.conf +++ b/deployment/config/redis.conf @@ -32,6 +32,9 @@ ################################ GENERAL ##################################### +### JMB - allow access from outside the container (also added port in docker-compose.yml) +protected-mode yes + # By default Redis does not run as a daemon. Use 'yes' if you need it. # Note that Redis will write a pid file in /var/run/redis.pid when daemonized. daemonize no diff --git a/deployment/config/supervisord.conf b/deployment/config/supervisord.conf index f06b47f9..cc50e3b2 100644 --- a/deployment/config/supervisord.conf +++ b/deployment/config/supervisord.conf @@ -45,8 +45,8 @@ priority=1 autostart=true autorestart=false user=root -stdout_logfile=/var/log/redis_stdout.log -stderr_logfile=/var/log/redis_stderr.log +stdout_logfile=/var/log/tango/redis_stdout.log +stderr_logfile=/var/log/tango/redis_stderr.log [program:nginx] command=/usr/sbin/nginx -c /etc/nginx/nginx.conf @@ -70,11 +70,12 @@ command=/bin/bash -c 'sleep 5 && python /opt/TangoService/Tango/restful-tango/se autostart=true process_name=%(process_num)01d redirect_stderr=true -stdout_logfile=/opt/TangoService/tango_log.log.%(process_num)01d +stdout_logfile=/var/log/tango/restful-tango.%(process_num)01d.log numprocs=2 [program:tangoJobManager] command=/bin/bash -c 'sleep 5 && python /opt/TangoService/Tango/jobManager.py' autostart=true +autorestart=true redirect_stderr=true -stdout_logfile=/opt/TangoService/tango_job_manager_log.log +stdout_logfile=/var/log/tango/jobManager.log diff --git a/jobManager.py b/jobManager.py index 7ec31aee..15fef281 100644 --- a/jobManager.py +++ b/jobManager.py @@ -9,7 +9,7 @@ # is launched that will handle things from here on. If anything goes # wrong, the job is made dead with the error. # -import threading, logging, time, copy +import threading, logging, time, copy, os from datetime import datetime from tango import * @@ -27,10 +27,11 @@ def __init__(self, queue): self.jobQueue = queue self.preallocator = self.jobQueue.preallocator self.vmms = self.preallocator.vmms - self.log = logging.getLogger("JobManager") + self.log = logging.getLogger("JobManager-" + str(os.getpid())) # job-associated instance id self.nextId = 10000 self.running = False + self.log.info("START jobManager") def start(self): if self.running: @@ -51,8 +52,9 @@ def _getNextID(self): """ id = self.nextId self.nextId += 1 - if self.nextId > 99999: - self.nextId = 10000 + # xxxXXX??? simply wrap the id without guarding condition is bad. disable for now. + # if self.nextId > 99999: + # self.nextId = 10000 return id def __manage(self): @@ -61,37 +63,51 @@ def __manage(self): id = self.jobQueue.getNextPendingJob() if id: + self.log.info("_manage: next job id %s" % id) + job = self.jobQueue.get(id) + if job is not None: + jobStr = ', '.join("%s: %s" % item for item in job.__dict__.items()) + # self.log.info("_manage job %s" % jobStr) if not job.accessKey and Config.REUSE_VMS: id, vm = self.jobQueue.getNextPendingJobReuse(id) job = self.jobQueue.get(id) - + if job is not None: + jobStr = ', '.join("%s: %s" % item for item in job.__dict__.items()) + self.log.info("_manage after getNextPendingJobReuse %s" % jobStr) + else: + self.log.info("_manage after getNextPendingJobReuse %s %s" % (id, vm)) try: # Mark the job assigned self.jobQueue.assignJob(job.id) + self.log.info("_manage after assignJob %s" % id) # if the job has specified an account # create an VM on the account and run on that instance if job.accessKeyId: from vmms.ec2SSH import Ec2SSH vmms = Ec2SSH(job.accessKeyId, job.accessKey) newVM = copy.deepcopy(job.vm) - newVM.id = self._getNextID() + newVM.id = self._getNextID() # xxxXXX??? try this path preVM = vmms.initializeVM(newVM) + self.log.info("_manage init new vm %s" % preVM.id) else: # Try to find a vm on the free list and allocate it to # the worker if successful. if Config.REUSE_VMS: preVM = vm + self.log.info("_manage use vm %s" % preVM.id) else: - preVM = self.preallocator.allocVM(job.vm.name) + # xxxXXX??? strongly suspect this code path doesn't work. + # After setting REUSE_VMS to False, job submissions don't run. + preVM = self.preallocator.allocVM(job.vm.pool) + self.log.info("_manage allocate vm %s" % preVM.id) vmms = self.vmms[job.vm.vmms] # Create new vmms object # Now dispatch the job to a worker self.log.info("Dispatched job %s:%d to %s [try %d]" % (job.name, job.id, preVM.name, job.retries)) - job.appendTrace( - "%s|Dispatched job %s:%d [try %d]" % - (datetime.utcnow().ctime(), job.name, job.id, job.retries)) + job.appendTrace("Dispatched job %s:%d to %s [try %d]" % + (job.name, job.id, preVM.name, job.retries)) Worker( job, @@ -102,7 +118,10 @@ def __manage(self): ).start() except Exception as err: - self.jobQueue.makeDead(job.id, str(err)) + if job is not None: + self.jobQueue.makeDead(job.id, str(err)) + else: + self.log.info("_manage: job is None") # Sleep for a bit and then check again time.sleep(Config.DISPATCH_PERIOD) @@ -117,9 +136,6 @@ def __manage(self): tango = TangoServer() tango.log.debug("Resetting Tango VMs") tango.resetTango(tango.preallocator.vmms) - for key in tango.preallocator.machines.keys(): - tango.preallocator.machines.set(key, [[], TangoQueue(key)]) jobs = JobManager(tango.jobQueue) - - print("Starting the stand-alone Tango JobManager") + tango.log.info("Starting the stand-alone Tango JobManager") jobs.run() diff --git a/jobQueue.py b/jobQueue.py index ad43e3b9..e3606cc5 100644 --- a/jobQueue.py +++ b/jobQueue.py @@ -90,8 +90,7 @@ def add(self, job): self.log.debug("add| Acquired lock to job queue.") self.liveJobs.set(job.id, job) - job.appendTrace("%s|Added job %s:%d to queue" % - (datetime.utcnow().ctime(), job.name, job.id)) + job.appendTrace("Added job %s:%d to queue" % (job.name, job.id)) self.log.debug("Ref: " + str(job._remoteLocation)) self.log.debug("job_id: " + str(job.id)) @@ -207,15 +206,22 @@ def getNextPendingJobReuse(self, target_id=None): # if target_id is set, only interested in this id if target_id and target_id != id: continue - # Create a pool if necessary - if self.preallocator.poolSize(job.vm.name) == 0: - self.preallocator.update(job.vm, Config.POOL_SIZE) + + # Create or enlarge a pool if there is no free vm to use and + # the limit for pool is not reached yet + if self.preallocator.freePoolSize(job.vm.pool) == 0 and \ + self.preallocator.poolSize(job.vm.pool) < Config.POOL_SIZE: + increment = 1 + if hasattr(Config, 'POOL_ALLOC_INCREMENT') and Config.POOL_ALLOC_INCREMENT: + increment = Config.POOL_ALLOC_INCREMENT + self.preallocator.incrementPoolSize(job.vm, increment) # If the job hasn't been assigned to a worker yet, see if there # is a free VM if (job.isNotAssigned()): - vm = self.preallocator.allocVM(job.vm.name) + vm = self.preallocator.allocVM(job.vm.pool) if vm: + self.log.info("getNextPendingJobReuse alloc vm %s to job %s" % (vm, id)) self.queueLock.release() return (id, vm) @@ -256,7 +262,7 @@ def unassignJob(self, jobId): def makeDead(self, id, reason): """ makeDead - move a job from live queue to dead queue """ - self.log.info("makeDead| Making dead job ID: " + str(id)) + self.log.info("makeDead| Making dead job ID: " + str(id) + " " + reason) self.queueLock.acquire() self.log.debug("makeDead| Acquired lock to job queue.") status = -1 @@ -268,7 +274,7 @@ def makeDead(self, id, reason): (job.name, job.id, reason)) self.deadJobs.set(id, job) self.liveJobs.delete(id) - job.appendTrace("%s|%s" % (datetime.utcnow().ctime(), reason)) + job.appendTrace(reason) self.queueLock.release() self.log.debug("makeDead| Released lock to job queue.") return status diff --git a/major_fixes_by_PDL b/major_fixes_by_PDL new file mode 100644 index 00000000..ee2aa54d --- /dev/null +++ b/major_fixes_by_PDL @@ -0,0 +1,91 @@ +This the the content of a 2018 email. + +Part 1 is a list of the major bugs fixes and improvements (some with +relevant commits), followed by Part 2, a list of new configuration +variables. Note that the commits may not be self-contained because +themselves may be buggy and have follow-up commits. They are here to +help understand the nature of the bugs and enhancements. + +Part 1. Bug fixes and enhancements + +The follow two bugs, combined, prevent pending jobs from being executed: +* When number of jobs is larger than number of vms in free pool, +jobManager dies. +* When jobManager restarts, free pool is not emptied whilst total pool +is, causing inconsistency. +https://github.com/xyzisinus/Tango/commit/4dcbbb4dfef096f3e64ef91f3eff4bf9d82b66b6 + +https://github.com/xyzisinus/Tango/commit/e2afe8a7d73bbd633282a35ec71ea690d2bb1db0 + + +* Add ability to specify image name for ec2 using "Name" tag on AMI +(used to allow only one image specified as DEFAULT_AMI): +https://github.com/xyzisinus/Tango/commit/97c22e39bcadf37b784cc2a0db5ea6202a5634ab + +https://github.com/xyzisinus/Tango/commit/e66551a53223b31c3baef74860eb845e4c2adac1 + + +* When job id reaches the max and wraps around, the jobs with larger ids +starve. +https://github.com/xyzisinus/Tango/commit/9565275dab5d0fa614b96b33bad642559f7714a4 + + +* Improve the worker's run() function to report errors on the +copy-in/exec/copy-out path more precisely. +https://github.com/xyzisinus/Tango/commit/caac9b46733716ed30feb62646d750a7accdd4f7 + +https://github.com/xyzisinus/Tango/commit/c47d8891a54f8cccef3ba4abd2938fa49c906dd1 + + +* In the original code, Tango allocates all vm instances allowed by +POOL_SIZE at once. It shouldn't be an issue because once a vm is made +ready a pending job should start using it. However, due to well-known +Python thread scheduling problems, the pending jobs will not run until +all vms are allocated. As we observed, vm allocations are almost +sequential although each allocation runs in a separate thread, again due +to Python's threading. That results in a long delay for the first job +to start running. To get around the problem, POOL_ALLOC_INCREMENT is +added to incrementally allocate vms and allow jobs to start running sooner. +https://github.com/xyzisinus/Tango/commit/93e60ada803514d4164237f5043bee95671259aa + + +* With POOL_SIZE_LOW_WATER_MARK, add the ability to shrink pool size +when there are extra vms in free pool. When low water mark is set to +zero, no vms are kept in free pool and a fresh vm is allocated for every +job and destroyed afterward. It is used to maintain desired number of +ec2 machines as standbys in the pool while terminating extra vms to save +money. +https://github.com/xyzisinus/Tango/commit/d896b360f6c8111a6be81df89bd43917519dd581 + +https://github.com/xyzisinus/Tango/commit/780557749cd14c272aad6a7ea4d5e04ff2ac18ed + + +* Improve autodriver with accurate error reporting and optional time +stamp insertion into job output. +Tango/autodriver/autodriver.c + +* When Tango restarts, vms in free pool are preserved (used to be all +destroyed). +https://github.com/xyzisinus/Tango/commit/e2afe8a7d73bbd633282a35ec71ea690d2bb1db0 + + +* Add run_jobs script to submit existing student handins in large numbers: +Tango/tools/run_jobs.py + +* Improve general logging by adding pid in logs and messages at critical +execution points. + +Part 2. New configuration variables (all optional) + +* Passed to autodriver to enhance readability of the output file. +Currently only integrated in ec2 vmms. +AUTODRIVER_LOGGING_TIME_ZONE +AUTODRIVER_TIMESTAMP_INTERVAL + +* Control of the preallocator pool as explained in Part 1. +POOL_SIZE_LOW_WATER_MARK +POOL_ALLOC_INCREMENT + +* Instead of destroying it, set the vm aside for further investigation +after autodriver returns OS ERROR. Currently only integrated in ec2 vmms. +KEEP_VM_AFTER_FAILURE diff --git a/preallocator.py b/preallocator.py index 026c09f5..82bce04d 100644 --- a/preallocator.py +++ b/preallocator.py @@ -1,7 +1,7 @@ # # preallocator.py - maintains a pool of active virtual machines # -import threading, logging, time, copy +import threading, logging, time, copy, os from tangoObjects import TangoDictionary, TangoQueue, TangoIntValue from config import Config @@ -9,8 +9,8 @@ # # Preallocator - This class maintains a pool of active VMs for future # job requests. The pool is stored in dictionary called -# "machines". This structure keys off the name of the TangoMachine -# (.name). The values of this dictionary are two-element arrays: +# "machines". This structure keys off the pool of the TangoMachine +# (.pool). The values of this dictionary are two-element arrays: # Element 0 is the list of the IDs of the current VMs in this pool. # Element 1 is a queue of the VMs in this pool that are available to # be assigned to workers. @@ -24,7 +24,7 @@ def __init__(self, vmms): self.lock = threading.Lock() self.nextID = TangoIntValue("nextID", 1000) self.vmms = vmms - self.log = logging.getLogger("Preallocator") + self.log = logging.getLogger("Preallocator-" + str(os.getpid())) def poolSize(self, vmName): """ poolSize - returns the size of the vmName pool, for external callers @@ -34,6 +34,30 @@ def poolSize(self, vmName): else: return len(self.machines.get(vmName)[0]) + def freePoolSize(self, vmName): + """ freePoolSize - returns the size of the vmName free pool, for external callers + """ + if vmName in self.machines.keys(): + return self.machines.get(vmName)[1].qsize() + else: + return 0 + + def incrementPoolSize(self, vm, delta): + """ + Called by jobQueue to create the pool and allcoate given number of vms + """ + + self.lock.acquire() + if vm.pool not in self.machines.keys(): + self.machines.set(vm.pool, [[], TangoQueue(vm.pool)]) + # see comments in jobManager.py for the same call + self.machines.get(vm.pool)[1].make_empty() + self.log.debug("Creating empty pool of %s instances" % (vm.pool)) + self.lock.release() + + self.log.debug("incrementPoolSize: add %d new vms to pool %s" % (delta, vm.pool)) + threading.Thread(target=self.__create(vm, delta)).start() + def update(self, vm, num): """ update - Updates the number of machines of a certain type to be preallocated. @@ -44,23 +68,25 @@ def update(self, vm, num): of machines as necessary. """ self.lock.acquire() - if vm.name not in self.machines.keys(): - self.machines.set(vm.name, [[], TangoQueue(vm.name)]) - self.log.debug("Creating empty pool of %s instances" % (vm.name)) + if vm.pool not in self.machines.keys(): + self.machines.set(vm.pool, [[], TangoQueue(vm.pool)]) + # see comments in jobManager.py for the same call + self.machines.get(vm.pool)[1].make_empty() + self.log.debug("Creating empty pool %s" % (vm.pool)) self.lock.release() - delta = num - len(self.machines.get(vm.name)[0]) + delta = num - len(self.machines.get(vm.pool)[0]) if delta > 0: # We need more self.machines, spin them up. self.log.debug( - "update: Creating %d new %s instances" % (delta, vm.name)) + "update: Creating %d new vms in pool %s" % (delta, vm.pool)) threading.Thread(target=self.__create(vm, delta)).start() elif delta < 0: # We have too many self.machines, remove them from the pool self.log.debug( - "update: Destroying %d preallocated %s instances" % - (-delta, vm.name)) + "update: Destroying %d preallocated vms in pool %s" % + (-delta, vm.pool)) for i in range(-1 * delta): threading.Thread(target=self.__destroy(vm)).start() @@ -79,49 +105,105 @@ def allocVM(self, vmName): self.lock.release() # If we're not reusing instances, then crank up a replacement + # xxxXXX??? test this code path if vm and not Config.REUSE_VMS: threading.Thread(target=self.__create(vm, 1)).start() return vm + def addToFreePool(self, vm): + """ addToFreePool - Returns a VM instance to the free list + """ + + self.lock.acquire() + machine = self.machines.get(vm.pool) + self.log.info("addToFreePool: add vm %s to free pool" % vm.name) + machine[1].put(vm) + self.machines.set(vm.pool, machine) + self.lock.release() + def freeVM(self, vm): """ freeVM - Returns a VM instance to the free list """ # Sanity check: Return a VM to the free list only if it is # still a member of the pool. not_found = False + should_destroy = False self.lock.acquire() - if vm and vm.id in self.machines.get(vm.name)[0]: - machine = self.machines.get(vm.name) - machine[1].put(vm) - self.machines.set(vm.name, machine) + if vm and vm.id in self.machines.get(vm.pool)[0]: + if (hasattr(Config, 'POOL_SIZE_LOW_WATER_MARK') and + Config.POOL_SIZE_LOW_WATER_MARK >= 0 and + vm.pool in self.machines.keys() and + self.freePoolSize(vm.pool) >= Config.POOL_SIZE_LOW_WATER_MARK): + self.log.info("freeVM: over low water mark. will destroy %s" % vm.id) + should_destroy = True + else: + machine = self.machines.get(vm.pool) + self.log.info("freeVM: return %s to free pool" % vm.id) + machine[1].put(vm) + self.machines.set(vm.pool, machine) else: + self.log.info("freeVM: %s not found in pool. Will destroy" % vm.name) not_found = True self.lock.release() # The VM is no longer in the pool. - if not_found: + if not_found or should_destroy: + self.log.info("freeVM: will destroy %s" % vm.id) vmms = self.vmms[vm.vmms] + self.removeVM(vm) vmms.safeDestroyVM(vm) def addVM(self, vm): """ addVM - add a particular VM instance to the pool """ self.lock.acquire() - machine = self.machines.get(vm.name) + machine = self.machines.get(vm.pool) machine[0].append(vm.id) - self.machines.set(vm.name, machine) + self.log.info("addVM: add vm %s" % vm.name) + self.machines.set(vm.pool, machine) + self.lock.release() + + # Note: This function is called from removeVM() to handle the case when a vm + # is in free pool. In theory this should never happen but we want to ensure + # that. To solve the problem cleanly, preallocator should provide ONE primitive + # to add/remove a vm from both total and free pools, instead of two disjoint ones. + def removeFromFreePool(self, vm): + self.lock.acquire() + size = self.machines.get(vm.pool)[1].qsize() + self.log.info("removeFromFreePool: %s" % vm.name) + for i in range(size): # go through free pool + freeVM = self.machines.get(vm.pool)[1].get_nowait() + # put it back into free pool, if not our vm + if vm.id != freeVM.id: + self.machines.get(vm.pool)[1].put(freeVM) + else: + self.log.info("removeFromFreePool: found %s in pool" % vm.name) + # don't put this particular vm back to free pool, that is removal self.lock.release() - def removeVM(self, vm): + # return True if the vm is in the pool (and removed) + def removeVM(self, vm, mustFind=True): """ removeVM - remove a particular VM instance from the pool """ self.lock.acquire() - machine = self.machines.get(vm.name) + machine = self.machines.get(vm.pool) + if not machine or vm.id not in machine[0]: + if mustFind: + self.log.error("removeVM: %s NOT found in pool" % vm.name) + else: + self.log.info("removeVM: %s NOT found in pool. This is OK" % vm.name) + self.lock.release() + return False + + self.log.info("removeVM: %s" % vm.name) machine[0].remove(vm.id) - self.machines.set(vm.name, machine) + self.machines.set(vm.pool, machine) self.lock.release() + self.removeFromFreePool(vm) # also remove from free pool, just in case + return True + def _getNextID(self): """ _getNextID - returns next ID to be used for a preallocated VM. Preallocated VM's have 4-digit ID numbers between 1000 @@ -132,6 +214,7 @@ def _getNextID(self): self.nextID.increment() + # xxxXXX??? shouldn't reset if self.nextID.get() > 9999: self.nextID.set(1000) @@ -144,20 +227,23 @@ def __create(self, vm, cnt): This function should always be called in a thread since it might take a long time to complete. """ + vmms = self.vmms[vm.vmms] self.log.debug("__create: Using VMMS %s " % (Config.VMMS_NAME)) for i in range(cnt): newVM = copy.deepcopy(vm) newVM.id = self._getNextID() - self.log.debug("__create|calling initializeVM") - vmms.initializeVM(newVM) - self.log.debug("__create|done with initializeVM") + self.log.debug("__create|calling initializeVM with id %d" % newVM.id) + ret = vmms.initializeVM(newVM) + if not ret: # ret is None when fails + self.log.debug("__create|failed initializeVM with id %d" % newVM.id) + continue + self.log.debug("__create|done initializeVM with id %d" % newVM.id) time.sleep(Config.CREATEVM_SECS) self.addVM(newVM) - self.freeVM(newVM) - self.log.debug("__create: Added vm %s to pool %s " % - (newVM.id, newVM.name)) + self.addToFreePool(newVM) + self.log.debug("__create: Added vm %s to pool" % newVM.name) def __destroy(self, vm): """ __destroy - Removes a VM from the pool @@ -169,10 +255,11 @@ def __destroy(self, vm): the free list is empty. """ self.lock.acquire() - dieVM = self.machines.get(vm.name)[1].get_nowait() + dieVM = self.machines.get(vm.pool)[1].get_nowait() self.lock.release() if dieVM: + self.log.info("__destroy: %s" % dieVM.name) self.removeVM(dieVM) vmms = self.vmms[vm.vmms] vmms.safeDestroyVM(dieVM) @@ -187,14 +274,17 @@ def createVM(self, vm): newVM.id = self._getNextID() self.log.info("createVM|calling initializeVM") - vmms.initializeVM(newVM) - self.log.info("createVM|done with initializeVM") + ret = vmms.initializeVM(newVM) + if not ret: + self.log.debug("createVM|failed initializeVM with id %d", newVM.id) + return self.addVM(newVM) - self.freeVM(newVM) - self.log.debug("createVM: Added vm %s to pool %s" % - (newVM.id, newVM.name)) + self.addToFreePool(newVM) + self.log.info("createVM|done with initializeVM %s" % newVM.name) + # xxxXXX??? most likely unused, only called by delVM() + ''' def destroyVM(self, vmName, id): """ destroyVM - Called by the delVM API function to remove and destroy a particular VM instance from a pool. We only allow @@ -207,12 +297,15 @@ def destroyVM(self, vmName, id): dieVM = None self.lock.acquire() size = self.machines.get(vmName)[1].qsize() + self.log.info("destroyVM: free:total pool %d:%d" % (size, len(self.machines.get(vmName)[0]))) if (size == len(self.machines.get(vmName)[0])): for i in range(size): vm = self.machines.get(vmName)[1].get_nowait() if vm.id != id: + self.log.info("destroyVM: put to free pool id:vm.id %s:%s" % (id, vm.id)) self.machines.get(vmName)[1].put(vm) else: + self.log.info("destroyVM: will call removeVM %s" % id) dieVM = vm self.lock.release() @@ -223,6 +316,7 @@ def destroyVM(self, vmName, id): return 0 else: return -1 + ''' def getAllPools(self): result = {} @@ -230,26 +324,29 @@ def getAllPools(self): result[vmName] = self.getPool(vmName) return result - def getPool(self, vmName): + def getPool(self, pool): """ getPool - returns the members of a pool and its free list """ result = {} - if vmName not in self.machines.keys(): + if pool not in self.machines.keys(): return result result["total"] = [] result["free"] = [] free_list = [] self.lock.acquire() - size = self.machines.get(vmName)[1].qsize() + size = self.machines.get(pool)[1].qsize() for i in range(size): - vm = self.machines.get(vmName)[1].get_nowait() + vm = self.machines.get(pool)[1].get_nowait() free_list.append(vm.id) - machine = self.machines.get(vmName) + machine = self.machines.get(pool) machine[1].put(vm) - self.machines.set(vmName, machine) + self.machines.set(pool, machine) self.lock.release() - result["total"] = self.machines.get(vmName)[0] + result["total"] = self.machines.get(pool)[0] result["free"] = free_list + self.log.info("getPool %s: free pool %s" % (pool, result["free"])) + self.log.info("getPool %s: total pool %s" % (pool, result["total"])) + return result diff --git a/requirements.txt b/requirements.txt index e5a20156..28d7484f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ backports.ssl-match-hostname==3.4.0.2 +boto3 boto==2.27.0 futures==2.2.0 plumbum==1.4.2 diff --git a/restful-tango/server.py b/restful-tango/server.py index ef7ff653..bb1068f5 100755 --- a/restful-tango/server.py +++ b/restful-tango/server.py @@ -143,6 +143,5 @@ def post(self, key, image, num): if len(sys.argv) > 1: port = int(sys.argv[1]) - tangoREST.tango.resetTango(tangoREST.tango.preallocator.vmms) application.listen(port, max_buffer_size=Config.MAX_INPUT_FILE_SIZE) tornado.ioloop.IOLoop.instance().start() diff --git a/restful-tango/tangoREST.py b/restful-tango/tangoREST.py index c61dff1c..8873004d 100644 --- a/restful-tango/tangoREST.py +++ b/restful-tango/tangoREST.py @@ -120,7 +120,6 @@ def createTangoMachine(self, image, vmms=Config.VMMS_NAME, """ createTangoMachine - Creates a tango machine object from image """ return TangoMachine( - name=image, vmms=vmms, image="%s" % (image), cores=vmObj["cores"], @@ -193,6 +192,8 @@ def convertTangoMachineObj(self, tangoMachine): vm['disk'] = tangoMachine.disk vm['id'] = tangoMachine.id vm['name'] = tangoMachine.name + vm['pool'] = tangoMachine.pool + vm['instance_type'] = tangoMachine.instance_type return vm def convertInputFileObj(self, inputFile): @@ -369,11 +370,11 @@ def jobs(self, key, deadJobs): if (int(deadJobs) == 0): jobs = self.tango.getJobs(0) self.log.debug( - "Retrieved live jobs (deadJobs = %s)" % deadJobs) + "Retrieved %d live jobs (deadJobs = %s)" % (len(jobs), deadJobs)) elif (int(deadJobs) == 1): jobs = self.tango.getJobs(-1) self.log.debug( - "Retrieved dead jobs (deadJobs = %s)" % deadJobs) + "Retrieved %d dead jobs (deadJobs = %s)" % (len(jobs), deadJobs)) result['jobs'] = list() for job in jobs: result['jobs'].append(self.convertTangoJobObj(job)) @@ -428,7 +429,7 @@ def prealloc(self, key, image, num, vmStr): self.log.error("Invalid prealloc size") return self.status.invalid_prealloc_size if ret == -3: - self.log.error("Invalid image name") + self.log.error("Invalid image name: %s" % image) return self.status.invalid_image self.log.info("Successfully preallocated VMs") return self.status.preallocated diff --git a/tango.py b/tango.py index 058c9930..418c372f 100755 --- a/tango.py +++ b/tango.py @@ -52,7 +52,14 @@ class TangoServer: def __init__(self): self.daemon = True - + + # init logging early, or some logging will be lost + logging.basicConfig( + filename=Config.LOGFILE, + format="%(levelname)s|%(asctime)s|%(name)s|%(message)s", + level=Config.LOGLEVEL, + ) + vmms = None if Config.VMMS_NAME == "tashiSSH": from vmms.tashiSSH import TashiSSH @@ -74,12 +81,7 @@ def __init__(self): # memory between processes. Otherwise, JobManager will # be initiated separately JobManager(self.jobQueue).start() - - logging.basicConfig( - filename=Config.LOGFILE, - format="%(levelname)s|%(asctime)s|%(name)s|%(message)s", - level=Config.LOGLEVEL, - ) + self.start_time = time.time() self.log = logging.getLogger("TangoServer") self.log.info("Starting Tango server") @@ -128,8 +130,8 @@ def getJobs(self, item): def preallocVM(self, vm, num): """ preallocVM - Set the pool size for VMs of type vm to num """ - self.log.debug("Received preallocVM(%s,%d)request" - % (vm.name, num)) + self.log.debug("Received preallocVM request: %d vms in pool %s" + % (num, vm.pool)) try: vmms = self.preallocator.vmms[vm.vmms] if not vm or num < 0: @@ -137,8 +139,6 @@ def preallocVM(self, vm, num): if vm.image not in vmms.getImages(): self.log.error("Invalid image name") return -3 - (name, ext) = os.path.splitext(vm.image) - vm.name = name self.preallocator.update(vm, num) return 0 except Exception as err: @@ -159,6 +159,8 @@ def getVMs(self, vmms_name): self.log.error("getVMs request failed: %s" % err) return [] + # xxxXXX??? plan to remove + ''' def delVM(self, vmName, id): """ delVM - delete a specific VM instance from a pool """ @@ -170,6 +172,7 @@ def delVM(self, vmName, id): except Exception as err: self.log.error("delVM request failed: %s" % err) return -1 + ''' def getPool(self, vmName): """ getPool - Return the current members of a pool and its free list @@ -201,37 +204,70 @@ def getInfo(self): stats['runjob_errors'] = Config.runjob_errors stats['copyout_errors'] = Config.copyout_errors stats['num_threads'] = threading.activeCount() - + isdst = (time.localtime().tm_isdst > 0) + stats['timezone_offset'] = time.altzone if isdst else time.timezone + (zone, daylight) = time.tzname + stats['timezone_name'] = zone + ("" if not daylight else ("/" + daylight)) + return stats # # Helper functions # + + # NOTE: This function should be called by ONLY jobManager. The rest servers + # shouldn't call this function. def resetTango(self, vmms): """ resetTango - resets Tango to a clean predictable state and ensures that it has a working virtualization environment. A side effect is that also checks that each supported VMMS is actually running. """ + + # There are two cases this function is called: 1. Tango has a fresh start. + # Then we want to destroy all instances in Tango's name space. 2. Job + # Manager is restarted after a previous crash. Then we want to destroy + # the "busy" instances prior to the crash and leave the "free" onces intact. + self.log.debug("Received resetTango request.") try: - # For each supported VMM system, get the instances it knows about, - # and kill those in the current Tango name space. + # For each supported VMM system, get the instances it knows about + # in the current Tango name space and kill those not in free pools. for vmms_name in vmms: vobj = vmms[vmms_name] + + # Round up all instances in the free pools. + allFreeVMs = [] + for key in self.preallocator.machines.keys(): + freePool = self.preallocator.getPool(key)["free"] + for vmId in freePool: + allFreeVMs.append(vobj.instanceName(vmId, key)) + self.log.info("vms in all free pools: %s" % allFreeVMs) + + # allFreeVMs = [] + + # For each in Tango's name space, destroy the onces in free pool. + # AND remove it from Tango's internal bookkeeping. vms = vobj.getVMs() self.log.debug("Pre-existing VMs: %s" % [vm.name for vm in vms]) - namelist = [] + + destroyedList = [] + removedList = [] for vm in vms: if re.match("%s-" % Config.PREFIX, vm.name): - vobj.destroyVM(vm) - # Need a consistent abstraction for a vm between - # interfaces - namelist.append(vm.name) - if namelist: + if vm.name not in allFreeVMs: + destroyedList.append(vm.name) + if self.preallocator.removeVM(vm, mustFind=False): + removedList.append(vm.name) + vobj.destroyVM(vm) + + if destroyedList: self.log.warning("Killed these %s VMs on restart: %s" % - (vmms_name, namelist)) + (vmms_name, destroyedList)) + if removedList: + self.log.warning("Removed these %s VMs from their pools" % + (removedList)) for _, job in self.jobQueue.liveJobs.iteritems(): if not job.isNotAssigned(): @@ -256,57 +292,47 @@ def __validateJob(self, job, vmms): # Every job must have a name if not job.name: self.log.error("validateJob: Missing job.name") - job.appendTrace("%s|validateJob: Missing job.name" % - (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing job.name") errors += 1 # Check the virtual machine field if not job.vm: self.log.error("validateJob: Missing job.vm") - job.appendTrace("%s|validateJob: Missing job.vm" % - (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing job.vm") errors += 1 else: if not job.vm.image: self.log.error("validateJob: Missing job.vm.image") - job.appendTrace("%s|validateJob: Missing job.vm.image" % - (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing job.vm.image") errors += 1 else: vobj = vmms[Config.VMMS_NAME] imgList = vobj.getImages() if job.vm.image not in imgList: - self.log.error("validateJob: Image not found: %s" % - job.vm.image) - job.appendTrace("%s|validateJob: Image not found: %s" % - (datetime.utcnow().ctime(), job.vm.image)) + self.log.error("validateJob: Image not found: %s" % job.vm.image) + + job.appendTrace("validateJob: Image not found: %s" % job.vm.image) errors += 1 - else: - (name, ext) = os.path.splitext(job.vm.image) - job.vm.name = name if not job.vm.vmms: self.log.error("validateJob: Missing job.vm.vmms") - job.appendTrace("%s|validateJob: Missing job.vm.vmms" % - (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing job.vm.vmms") errors += 1 else: if job.vm.vmms not in vmms: self.log.error("validateJob: Invalid vmms name: %s" % job.vm.vmms) - job.appendTrace("%s|validateJob: Invalid vmms name: %s" % - (datetime.utcnow().ctime(), job.vm.vmms)) + job.appendTrace("validateJob: Invalid vmms name: %s" % job.vm.vmms) errors += 1 # Check the output file if not job.outputFile: self.log.error("validateJob: Missing job.outputFile") - job.appendTrace("%s|validateJob: Missing job.outputFile" % (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing job.outputFile") errors += 1 else: if not os.path.exists(os.path.dirname(job.outputFile)): - self.log.error("validateJob: Bad output path: %s", job.outputFile) - job.appendTrace("%s|validateJob: Bad output path: %s" % - (datetime.utcnow().ctime(), job.outputFile)) + self.log.error("validateJob: Bad output path: %s" % job.outputFile) + job.appendTrace("validateJob: Bad output path: %s" % job.outputFile) errors += 1 # Check for max output file size parameter @@ -320,14 +346,12 @@ def __validateJob(self, job, vmms): for inputFile in job.input: if not inputFile.localFile: self.log.error("validateJob: Missing inputFile.localFile") - job.appendTrace("%s|validateJob: Missing inputFile.localFile" % - (datetime.utcnow().ctime())) + job.appendTrace("validateJob: Missing inputFile.localFile") errors += 1 else: if not os.path.exists(os.path.dirname(job.outputFile)): - self.log.error("validateJob: Bad output path: %s", job.outputFile) - job.appendTrace("%s|validateJob: Bad output path: %s" % - (datetime.utcnow().ctime(), job.outputFile)) + self.log.error("validateJob: Bad output path: %s" % job.outputFile) + job.appendTrace("validateJob: Bad output path: %s" % job.outputFile) errors += 1 if inputFile.destFile == 'Makefile': @@ -336,8 +360,8 @@ def __validateJob(self, job, vmms): # Check if input files include a Makefile if not hasMakefile: self.log.error("validateJob: Missing Makefile in input files.") - job.appendTrace("%s|validateJob: Missing Makefile in input files." % (datetime.utcnow().ctime())) - errors+=1 + job.appendTrace("validateJob: Missing Makefile in input files.") + errors+=1 # Check if job timeout has been set; If not set timeout to default if not job.timeout or job.timeout <= 0: @@ -348,8 +372,7 @@ def __validateJob(self, job, vmms): # Any problems, return an error status if errors > 0: self.log.error("validateJob: Job rejected: %d errors" % errors) - job.appendTrace("%s|validateJob: Job rejected: %d errors" % - (datetime.utcnow().ctime(), errors)) + job.appendTrace("validateJob: Job rejected: %d errors" % errors) return -1 else: return 0 diff --git a/tangoObjects.py b/tangoObjects.py index 17e4130f..2c951d09 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -2,17 +2,24 @@ # # Implements objects used to pass state within Tango. # +import os import redis import pickle import Queue +import logging +from datetime import datetime from config import Config redisConnection = None - -def getRedisConnection(): +# Pass in an existing connection to redis, sometimes necessary for testing. +def getRedisConnection(connection=None): global redisConnection if redisConnection is None: + if connection: + redisConnection = connection + return redisConnection + redisConnection = redis.StrictRedis( host=Config.REDIS_HOSTNAME, port=Config.REDIS_PORT, db=0) @@ -41,25 +48,42 @@ class TangoMachine(): TangoMachine - A description of the Autograding Virtual Machine """ - def __init__(self, name="DefaultTestVM", image=None, vmms=None, + def __init__(self, image=None, vmms=None, network=None, cores=None, memory=None, disk=None, - domain_name=None, ec2_id=None, resume=None, id=None, - instance_id=None): - self.name = name + domain_name=None): self.image = image + self.vmms = vmms self.network = network self.cores = cores self.memory = memory self.disk = disk - self.vmms = vmms self.domain_name = domain_name - self.ec2_id = ec2_id - self.resume = resume - self.id = id - self.instance_id = id + + self.resume = None + self.id = None + self.instance_id = None + self.instance_type = None + self.notes = None + + # The following attributes can instruct vmms to set the test machine + # aside for further investigation. + self.keepForDebugging = False + + self.pool = None + self.name = None # in the form of prefix-id-pool, constructed by the vmms + + # The image may contain instance type if vmms is ec2. Example: + # course101+t2.small. + if image: + imageParts = image.split('+') + if len(imageParts) == 2: + self.image = imageParts[0] + self.instance_type = imageParts[1] + (pool, ext) = os.path.splitext(self.image) + self.pool = pool + ("+" + self.instance_type if self.instance_type else "") def __repr__(self): - return "TangoMachine(image: %s, vmms: %s)" % (self.image, self.vmms) + return "TangoMachine(image: %s, vmms: %s, id: %s)" % (self.image, self.vmms, self.id) class TangoJob(): @@ -91,6 +115,7 @@ def __init__(self, vm=None, self._remoteLocation = None self.accessKeyId = accessKeyId self.accessKey = accessKey + self.tm = datetime.now() def makeAssigned(self): self.syncRemote() @@ -107,8 +132,9 @@ def isNotAssigned(self): return not self.assigned def appendTrace(self, trace_str): + # trace attached to the object can be retrived and sent to rest api caller self.syncRemote() - self.trace.append(trace_str) + self.trace.append("%s|%s" % (datetime.now().ctime(), trace_str)) self.updateRemote() def setId(self, new_id): @@ -210,6 +236,14 @@ def __init__(self, name, namespace="queue"): self.__db = getRedisConnection() self.key = '%s:%s' % (namespace, name) + # for debugging. return a readable string representation + def dump(self): + unpickled_obj = self.__db.lrange(self.key, 0, -1) + objs = [] + for obj in unpickled_obj: + objs.append(pickle.loads(obj)) + return objs + def qsize(self): """Return the approximate size of the queue.""" return self.__db.llen(self.key) @@ -239,6 +273,12 @@ def get(self, block=True, timeout=None): item = pickle.loads(item) return item + def make_empty(self): + while True: + item = self.__db.lpop(self.key) + if item is None: + break + def get_nowait(self): """Equivalent to get(False).""" return self.get(False) @@ -268,6 +308,7 @@ class TangoRemoteDictionary(): def __init__(self, object_name): self.r = getRedisConnection() self.hash_name = object_name + self.log = logging.getLogger("TangoRemoteDictionary") def set(self, id, obj): pickled_obj = pickle.dumps(obj) @@ -305,8 +346,12 @@ def _clean(self): self.r.delete(self.hash_name) def iteritems(self): - return iter([(i, self.get(i)) for i in xrange(1,Config.MAX_JOBID+1) - if self.get(i) != None]) + # find all non-empty spots in the job id spectrum (actual jobs) and sort + # by the time of creation to prevent starvation of jobs with larger ids + + return iter(sorted([(i, self.get(i)) for i in xrange(1,Config.MAX_JOBID+1) + if self.get(i) != None], key=lambda x: x[1].tm)) + class TangoNativeDictionary(): @@ -333,8 +378,8 @@ def delete(self, id): del self.dict[str(id)] def iteritems(self): - return iter([(i, self.get(i)) for i in xrange(1,Config.MAX_JOBID+1) - if self.get(i) != None]) + return iter(sorted([(i, self.get(i)) for i in xrange(1,Config.MAX_JOBID+1) + if self.get(i) != None], key=lambda x: x[1].tm)) def _clean(self): # only for testing diff --git a/tools/check_jobs.py b/tools/check_jobs.py new file mode 100644 index 00000000..1952951e --- /dev/null +++ b/tools/check_jobs.py @@ -0,0 +1,127 @@ +import os, re, glob, datetime, time, json, string +from dateutil import parser +import smtplib +from email.mime.text import MIMEText + +from config_for_run_jobs import Config +from util import Cmd +from util import CommandLine +from util import Lab +import util + +# This script is run as a cron job every minute to detect potentially +# stuck jobs and send email to the administrator. +# It asks Tango for the live jobs. Then it looks at the last-seen +# timestamp in each job's trace to determine if it's a "slow" job. +# It keeps the questionable jobs in a file so that they are not +# reported again by the next execution of this script. +# Potential false negative: Suppose Tango dies and is restarted, +# then the jobIds stored in the "reported jobs" file from Tango's last +# incarnation may overlap with the current jobIds. In such case, +# the overlapping jobIds will not be reported. However, when Tango +# is stuck there usually will be more stuck jobs to be reported for +# the admin's attention. + +cfg = Config() +cmd = Cmd(cfg, None) + +REPORTED_JOBS_PATH = "/var/log/tango/lastSeenSlowJobsBy_check_jobs" + +jsonResult = {} +reportedJobs = [] # trouble jobs found in last execution +troubleJobs = [] # trouble jobs found in this execution +writeFailure = "" +mailbodyP1 = "" +mailbodyP2 = "" + +def sendmail(): + global mailbodyP1, mailbodyP2 + + if not mailbodyP1 and not writeFailure: + print "No error to report @ %s" % datetime.datetime.now() + return + + print "email report @ %s" % datetime.datetime.now() + HOST = "smtp.pdl.local.cmu.edu" + SUBJECT = "Autolab trouble @ %s" % datetime.datetime.now() + FROM = "czang@cmu.edu" + TO = "czang@cmu.edu" + BODY = string.join(( + "From: %s" % FROM, + "To: %s" % TO, + "Subject: %s" % SUBJECT , + "", + writeFailure + mailbodyP1 + mailbodyP2 + ), "\r\n") + server = smtplib.SMTP(HOST) + server.sendmail(FROM, ["czang@cmu.edu", "jboles@cmu.edu"], BODY) + # server.sendmail(FROM, ["czang@cmu.edu"], BODY) + server.quit() + +def report(jobId, msg): + global mailbodyP1, mailbodyP2 + email = "" + + # add into trouble list but may not report this time + troubleJobs.append(jobId) + if jobId in reportedJobs: + return + + # go through the job list to find the job by jobId + for job in jsonResult["jobs"]: + if job["id"] != jobId: continue + if not mailbodyP1: + mailbodyP1 = "\nTrouble jobs:\n" + mailbodyP2 = "\nJob details:\n" + matchObj = re.match(r'(.*)_[0-9]+_(.*)', job["name"], re.M|re.I) + email = matchObj.group(2) + mailbodyP1 += "job %s, student %s: %s\n" % (jobId, email, msg) + mailbodyP2 += json.dumps(job, indent=2, sort_keys=True) + +# use a dump file for testing +if 0: + with open('./testData') as jsonData: + jsonResult = json.load(jsonData) + +# read the jobs that have been reported +try: + with open(REPORTED_JOBS_PATH) as jsonData: + reportedJobs = json.load(jsonData) +except: + reportedJobs = [] + +jsonResult = cmd.returnLiveJobs() # comment out this line to use test data + +for job in jsonResult["jobs"]: + jobId = job["id"] + if "trace" not in job: + report(jobId, "Can't find trace for the job") + continue + + lastLineOfTrace = job["trace"][-1] + (timeStr, msg) = lastLineOfTrace.split("|") + timestamp = parser.parse(timeStr) + action = msg.split()[0] + jobTimeout = job["timeout"] + + now = datetime.datetime.now() + elapsed = (now - timestamp).total_seconds() + if action == "running": + if elapsed > (jobTimeout + 120): + report(jobId, "Job should be timed out") + elif elapsed > 120: + report(jobId, "It's been too long since last trace") +# end of for loop + +# write troubled jobs found in this execution to file +try: + with open(REPORTED_JOBS_PATH, 'w') as outfile: + json.dump(troubleJobs, outfile) +except Exception as e: + writeFailure = "Failed to write to %s: %s\n" % (REPORTED_JOBS_PATH, e) + +# report trouble jobs AND maybe failure of writing to file +sendmail() + +exit() + diff --git a/tools/config_for_run_jobs.py b/tools/config_for_run_jobs.py new file mode 100644 index 00000000..80b86f76 --- /dev/null +++ b/tools/config_for_run_jobs.py @@ -0,0 +1,63 @@ +# This is a config file for run_jobs.py. +# Change the file to fit your environment. +# Please do NOT commit your changes unless +# 1. There is a need for more configuration settings and +# 2. You have made it known to Xiaolin Charlene Zang. + +class Config: + # The settings are listed in the order of most-likly a changed is needed + # to the least-likely. + + # YOUR course name + course = "your-name-experiment" + course = "czang-exp" + + # YOUR root dir for course/lab definitions and handin (student submissions) + courseRoot = "/mnt/data/f16/" + + # YOUR lab definitions. The index of the lab is given to run_job.py + labs = [ + {"name": "cloudfscheckpoint2dedup", "handinSuffix": ".tar", "image": "746"}, + {"name": "myftlcheckpoint1", "handinSuffix": ".cpp", "image": "746"}] + + # Range of student submissions to run (sorted by student emails) + # If either is None, all student submissions are run, unless + # -r, -f, or -s is given to run_jobs. + firstStudentNum = 3 # start from index 3 (set to None for all students) + totalStudents = 1 # number of students to submit + + firstStudentNum = None # set to None for all students + + # YOUR Tango container's root dir for submissions and output + tangoFileRoot = "/root/autolab-oneclick/server/tango_courselabs" + + # YOUR Tango repo root (cloned from xyzisinus' Autolab github) + tangoDir = "/h/myname/Tango" + tangoDir = "/root/autolab-oneclick/server/Tango" + + # IP of the tango container is usually computed automatically + tangoIP = "" + + # INFO: Where tango and redis ports are defined + # In docker-compose.yml file (under parent dir of Tango), there can be: + ''' + tango: + ports: + - '8600:8600' + - '6380:6379' + ''' + # The first port pair is for tango. The port before ":" is on the host and + # the other (optional) inside the container if tango/redis are run in a + # container. The second line is for redis. + # Sometimes we run multiple tango/redis containers on the same host for + # separate experiments. To access different tango/redis, we can give them + # different on-host port numbers, hence the need for the HostPort variables. + # A util script can reach the desirable entity using those varialbes. + + # Note: This variable is used by tools/util.py (run_jobs.py) only so far. + tangoHostPort = "host-port 8600" + + # Note: This variable is used by tools/ec2Read.py only so far. + redisHostPort = 6379 # default + +# end of class Config diff --git a/tools/ec2Read.py b/tools/ec2Read.py new file mode 100644 index 00000000..eac13c04 --- /dev/null +++ b/tools/ec2Read.py @@ -0,0 +1,324 @@ +import os, sys, time, re, json, pprint, datetime +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from vmms.ec2SSH import Ec2SSH +from preallocator import Preallocator +from tangoObjects import TangoQueue +from tangoObjects import TangoMachine +from tango import TangoServer +from config import Config +import config_for_run_jobs +import boto3 +import pytz +import argparse + +# Read aws instances, Tango preallocator pools, etc. +# Also serve as sample code for quick testing of Tango/VMMS functionalities. + +class CommandLine(): + def __init__(self): + parser = argparse.ArgumentParser( + description='List AWS vms and preallocator pools') + parser.add_argument('-a', '--accessIdKeyUser', + help="aws access id, key and user, space separated") + parser.add_argument('-c', '--createVMs', action='store_true', + dest='createVMs', help="add a VM for each pool") + parser.add_argument('-C', '--createInstance', action='store_true', + dest='createInstance', help="create an instance without adding to a pool") + parser.add_argument('-d', '--destroyVMs', action='store_true', + dest='destroyVMs', help="destroy VMs and empty pools") + parser.add_argument('-D', '--instanceNameTags', nargs='+', + help="destroy instances by name tags or AWS ids (can be partial). \"None\" (case insensitive) deletes all instances without a \"Name\" tag") + parser.add_argument('-l', '--list', action='store_true', + dest='listVMs', help="list and ping live vms") + parser.add_argument('-L', '--listAll', action='store_true', + dest='listInstances', help="list all instances") + self.args = parser.parse_args() + +cmdLine = CommandLine() +argDestroyInstanceByNameTags = cmdLine.args.instanceNameTags +argListVMs = cmdLine.args.listVMs +argListAllInstances = cmdLine.args.listInstances +argDestroyVMs = cmdLine.args.destroyVMs +argCreateVMs = cmdLine.args.createVMs +argCreateInstance = cmdLine.args.createInstance +argAccessIdKeyUser = cmdLine.args.accessIdKeyUser + +def destroyVMs(): + vms = ec2.getVMs() + print "number of Tango VMs:", len(vms) + for vm in vms: + if vm.id: + print "destroy", nameToPrint(vm.name) + ec2.destroyVM(vm) + else: + print "VM not in Tango naming pattern:", nameToPrint(vm.name) + +def pingVMs(): + vms = ec2.getVMs() + print "number of Tango VMs:", len(vms) + for vm in vms: + if vm.id: + print "ping", nameToPrint(vm.name) + # Note: following call needs the private key file for aws to be + # at wherever SECURITY_KEY_PATH in config.py points to. + # For example, if SECURITY_KEY_PATH = '/root/746-autograde.pem', + # then the file should exist there. + ec2.waitVM(vm, Config.WAITVM_TIMEOUT) + else: + print "VM not in Tango naming pattern:", nameToPrint(vm.name) + +local_tz = pytz.timezone(Config.AUTODRIVER_LOGGING_TIME_ZONE) +def utc_to_local(utc_dt): + local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz) + return local_dt.strftime("%Y%m%d-%H:%M:%S") + +# to test destroying instances without "Name" tag +def deleteNameTagForAllInstances(): + instances = listInstances() + for instance in instances: + boto3connection.delete_tags(Resources=[instance["Instance"].id], + Tags=[{"Key": "Name"}]) + print "Afterwards" + print "----------" + listInstances() + +# to test changing tags to keep the vm after test failure +def changeTagForAllInstances(): + instances = listInstances() + for inst in instances: + instance = inst["Instance"] + name = inst["Name"] + notes = "tag " + name + " deleted" + boto3connection.delete_tags(Resources=[instance["InstanceId"]], + Tags=[{"Key": "Name"}]) + boto3connection.create_tags(Resources=[instance["InstanceId"]], + Tags=[{"Key": "Name", "Value": "failed-" + name}, + {"Key": "Notes", "Value": notes}]) + + print "Afterwards" + print "----------" + listInstances() + +def listInstances(all=None): + nameAndInstances = [] + + filters=[] + instanceType = "all" + if not all: + filters=[{'Name': 'instance-state-name', 'Values': ['running']}] + instanceType = "running" + + instances = boto3resource.instances.filter(Filters=filters) + for instance in boto3resource.instances.filter(Filters=filters): + nameAndInstances.append({"Name": ec2.getTag(instance.tags, "Name"), + "Instance": instance}) + + nameAndInstances.sort(key=lambda x: x["Name"]) + print "number of", instanceType, "AWS instances:", len(nameAndInstances) + + for item in nameAndInstances: + instance = item["Instance"] + launchTime = utc_to_local(instance.launch_time) + if instance.public_ip_address: + print("%s: %s %s %s %s" % + (nameToPrint(item["Name"]), instance.id, + launchTime, instance.state["Name"], + instance.public_ip_address)) + else: + print("%s: %s %s %s" % + (nameToPrint(item["Name"]), instance.id, + launchTime, instance.state["Name"])) + + if instance.tags: + for tag in instance.tags: + if (tag["Key"] != "Name"): + print("\t tag {%s: %s}" % (tag["Key"], tag["Value"])) + else: + print("\t No tags") + + print "\t InstanceType:", instance.instance_type + """ useful sometimes + image = boto3resource.Image(instance.image_id) + print "\t ImageId:", image.image_id + for tag in image.tags: + print("\t\t image tag {%s: %s}" % (tag["Key"], tag["Value"])) + """ + + return nameAndInstances + +def listPools(): + print "known AWS images:", ec2.img2ami.keys() + knownPools = server.preallocator.machines.keys() + print "Tango VM pools:", "" if knownPools else "None" + + for key in knownPools: + pool = server.preallocator.getPool(key) + totalPool = pool["total"] + freePool = pool["free"] + totalPool.sort() + freePool.sort() + print "pool", nameToPrint(key), "total", len(totalPool), totalPool, freePool + +def nameToPrint(name): + return "[" + name + "]" if name else "[None]" + +# allocate "num" vms for each and every pool (image) +def addVMs(): + # Add a vm for each image and a vm for the first image plus instance type + instanceTypeTried = False + for key in ec2.img2ami.keys(): + vm = TangoMachine(vmms="ec2SSH", image=key) + pool = server.preallocator.getPool(vm.pool) + currentCount = len(pool["total"]) if pool else 0 + print "adding a vm into pool", nameToPrint(vm.pool), "current size", currentCount + server.preallocVM(vm, currentCount + 1) + + if instanceTypeTried: + continue + else: + instanceTypeTried = True + + vm = TangoMachine(vmms="ec2SSH", image=key+"+t2.small") + pool = server.preallocator.getPool(vm.pool) + currentCount = len(pool["total"]) if pool else 0 + print "adding a vm into pool", nameToPrint(vm.pool), "current size", currentCount + server.preallocVM(vm, currentCount + 1) + +def destroyRedisPools(): + for key in server.preallocator.machines.keys(): + print "clean up pool", key + server.preallocator.machines.set(key, [[], TangoQueue(key)]) + server.preallocator.machines.get(key)[1].make_empty() + +# END of function definitions # + +boto3connection = boto3.client("ec2", Config.EC2_REGION) +boto3resource = boto3.resource("ec2", Config.EC2_REGION) + +server = TangoServer() +ec2 = server.preallocator.vmms["ec2SSH"] +pools = ec2.img2ami + +if argDestroyInstanceByNameTags: + nameAndInstances = listInstances() + totalTerminated = [] + + matchingInstances = [] + for partialStr in argDestroyInstanceByNameTags: + if partialStr.startswith("i-"): # match instance id + for item in nameAndInstances: + if item["Instance"].id.startswith(partialStr): + matchingInstances.append(item) + else: + # part of "Name" tag or None to match instances without name tag + for item in nameAndInstances: + nameTag = ec2.getTag(item["Instance"].tags, "Name") + if nameTag and \ + (nameTag.startswith(partialStr) or nameTag.endswith(partialStr)): + matchingInstances.append(item) + elif not nameTag and partialStr == "None": + matchingInstances.append(item) + + # the loop above may generate duplicates in matchingInstances + terminatedInstances = [] + for item in matchingInstances: + if item["Instance"].id not in terminatedInstances: + boto3connection.terminate_instances(InstanceIds=[item["Instance"].id]) + terminatedInstances.append(item["Instance"].id) + + if terminatedInstances: + print "terminate %d instances matching query string \"%s\":" % \ + (len(terminatedInstances), argDestroyInstanceByNameTags) + for id in terminatedInstances: + print id + print "Afterwards" + print "----------" + listInstances() + else: + print "no instances matching query string \"%s\"" % argDestroyInstanceByNameTags + + exit() + +if argListAllInstances: + listInstances("all") + exit() + +if argListVMs: + listInstances() + listPools() + pingVMs() + exit() + +if argDestroyVMs: + destroyVMs() + destroyRedisPools() + print "Afterwards" + print "----------" + listInstances() + listPools() + exit() + +if argCreateVMs: + listInstances() + listPools() + addVMs() # add 1 vm for each image and each image plus instance type + listInstances() + listPools() + exit() + +# Create number of instances (no pool), some of them without name tag +# to test untagged stale machine cleanup ability in Tango. +# watch tango.log for the cleanup actions. +if argCreateInstance: + # The cleanup function is not active unless the application is + # jobManager. Therefore we start it manually here. + if hasattr(ec2, 'setTimer4cleanup'): + print "start setTimer4cleanup function in vmms" + ec2.setTimer4cleanup() + + i = 0 + while True: + vm = TangoMachine(vmms="ec2SSH") + vm.id = int(datetime.datetime.utcnow().strftime('%s')) + vm.image = '746' + vm.pool = '746' + vm.name = ec2.instanceName(vm.id, vm.pool) + result = ec2.initializeVM(vm) + if result: + print "created: ", result.name, result.instance_id + else: + print "failed to create" + break + + # delete name tage for half of instances + if i % 2 == 0: + boto3connection.delete_tags(Resources=[result.instance_id], + Tags=[{"Key": "Name"}]) + i += 1 + time.sleep(30) + + if i > 20: + break + + time.sleep(10000) + exit() + +# ec2WithKey can be used to test the case that tango_cli uses +# non-default aws access id and key +if argAccessIdKeyUser: + if len(argAccessIdKeyUser.split()) != 3: + print "access id, key and user must be quoted and space separated" + exit() + (id, key, user) = argAccessIdKeyUser.split() + ec2WithKey = Ec2SSH(accessKeyId=id, accessKey=key, ec2User=user) + vm = TangoMachine(vmms="ec2SSH") + vm.id = int(2000) # a high enough number to avoid collision + # to test non-default access id/key, the aws image must have the key manually + # installed or allows the key to be installed by the aws service. + # the following assumes we have such image with a "Name" tag "test01.img" + vm.pool = "test01" + ec2WithKey.initializeVM(vm) + ec2WithKey.waitVM(vm, Config.WAITVM_TIMEOUT) + listInstances() + +# Write combination of ops not provided by the command line options here: diff --git a/tools/run_jobs.py b/tools/run_jobs.py new file mode 100644 index 00000000..f35a826c --- /dev/null +++ b/tools/run_jobs.py @@ -0,0 +1,270 @@ +import os, re, glob, datetime, time + +from config_for_run_jobs import Config +from util import Cmd +from util import CommandLine +from util import Lab +import util + +# Drive exiting student submissions to Tango. +# Find course/lab at specified location and submits work from the handin directory. +# Then wait for job output files. +# +# Use -h to show usage. +# See config_for_run_jobs.py for configuration options. + +cfg = Config() +cmdLine = CommandLine(cfg) +cmd = Cmd(cfg, cmdLine) + +if cmdLine.args.jobs: + cmd.jobs() + exit() + + +startTime = time.mktime(datetime.datetime.now().timetuple()) +outputFiles = [] + +for labIndex in cmdLine.args.indecies: + if labIndex >= len(cfg.labs): + print("lab index %d is out of range" % labIndex) + exit(-1) + +# run list of labs in sequence given on command line +for labIndex in cmdLine.args.indecies: + lab = Lab(cfg, cmdLine, labIndex) + + students = [] + student2file = {} + + # get student handin files, the last submission for each student, + # and make a map from email to useful attrbutes + + # if the handin dir also has the output files from the past, use them + # as baseline. A crude test is to see if the number of output files is + # close to the number of handin files (within 10% difference). + nOutputFiles = len(glob.glob(lab.handinOutputFileQuery)) + nHandinFiles = len(glob.glob(lab.handinFileQuery)) + checkHandinOutput = True if abs(nOutputFiles / float(nHandinFiles) - 1.0) < 0.1 else False + + for file in sorted(glob.glob(lab.handinFileQuery)): + baseName = file.split("/").pop() + matchObj = re.match(r'(.*)_([0-9]+)_(.*)', baseName, re.M|re.I) + email = matchObj.group(1) + versionStr = matchObj.group(2) + version = int(versionStr) + + withoutSuffix = baseName.replace(lab.handinSuffix, "") + outputFile = withoutSuffix + "_" + lab.name + ".txt" + jobName = lab.courseLab + "_" + withoutSuffix + + handinOutput = None + passed = None + if checkHandinOutput: + handinOutput = lab.handinDir + "/" + email + "_" + versionStr + lab.handinOutputFileSuffix + if os.path.isfile(handinOutput): + passed = True if util.outputOK(handinOutput) else False + else: + handinOutput = None + + # add newly seen student + if email not in students: + students.append(email) + + # if previous output is available, only use the submission that has matching output + if checkHandinOutput: + if email not in student2file or \ + (version > student2file[email]["version"] and \ + (handinOutput and student2file[email]["existingOutput"]) or + (not handinOutput and not student2file[email]["existingOutput"])) or \ + (not student2file[email]["existingOutput"] and handinOutput): + studentFile = {"result": passed, "existingOutput": handinOutput, # previous outcome + "version": version, "full": file, "base": baseName, "job": jobName, + "stripped": matchObj.group(3), "output": outputFile} + student2file[email] = studentFile + elif email not in student2file or version > student2file[email]["version"]: + studentFile = {"version": version, "full": file, "base": baseName, "job": jobName, + "stripped": matchObj.group(3), "output": outputFile} + student2file[email] = studentFile + # end of for loop in handin files + + # report pre-existing failures and missing output files + knownFailures = [] + outcomeUnknown = [] + if checkHandinOutput: + for student in students: + if student2file[student]["result"] == None: + outcomeUnknown.append(student) + elif not student2file[student]["result"]: + knownFailures.append(student) + if knownFailures: + print "#", len(knownFailures), "known failures" + for student in knownFailures: + print student, student2file[student]["existingOutput"] + if outcomeUnknown: + print "#", len(outcomeUnknown), "students without existing output files" + for student in outcomeUnknown: + print student + + # print the students and the indices + if cmdLine.args.list_students: + i = 0 + print ("# %d student handin for lab %s from %s" % + (len(student2file), lab.name, lab.handinFileQuery)) + for student in students: + print i, student, student2file[student] + i += 1 + exit() + + # submit all student works or a given range, or given student list, + # or all failed students + studentIndexList = [] + studentsToRun = [] + studentList = cmdLine.args.students + + firstStudentNum = cfg.firstStudentNum + totalStudents = cfg.totalStudents + + # look for failures from output or from lab's handin (with "-H" option) + if cmdLine.args.re_run or cmdLine.args.failures: + studentList = util.getRerunList(cfg, lab) + + if studentList or cmdLine.args.re_run or cmdLine.args.failures: + for studentToRun in studentList: + studentIndex = None + nMatches = 0 + index = 0 + for student in students: + if student.startswith(studentToRun): + studentIndex = index + nMatches += 1 + index += 1 + if nMatches != 1: + print "ERROR: no match or multiple matchs found for", studentToRun + exit() + studentIndexList.append(studentIndex) + studentsToRun.append(studentToRun) + + else: + if firstStudentNum is None or totalStudents is None: + firstStudentNum = 0 + totalStudents = len(students) + studentIndexList = list(index for index in range (firstStudentNum, firstStudentNum + totalStudents)) + + # run students in a given order + studentIndexList.sort() + studentsToRun.sort() + + print ("# Found total %d student submissions for lab %s" % (len(students), lab.name)) + if cmdLine.args.failures: + print ("# %d failed submissions for lab %s from %s" % + (len(studentIndexList), lab.name, lab.outputFileQuery)) + for index in studentIndexList: + print ("%3d: %s" % (index, students[index])) + continue # move onto next lab + + if cmdLine.args.verbose: + print ("# Students submissions: %d" % len(studentIndexList)) + for index in studentIndexList: + print ("%3d: %s" % (index, students[index])) + else: + print ("# Students to run: %d" % (len(studentIndexList))) + + if len(studentIndexList) == 0: + print ("# No student submissions for lab %s" % lab.name) + continue # move onto next lab + + cmd.info() + cmd.open(lab) + + # load lab files + cmd.upload(lab, lab.makefile) + cmd.upload(lab, lab.autogradeTar) + + # before sending the jobs, clean the existing ouput files. + # also collect the files locations for output file waiting. + for i in studentIndexList: + outputFile = lab.outputDir + "/" + student2file[students[i]]["output"] + outputFiles.append(outputFile) + try: + os.remove(outputFile) + print "# Delete existing output file:", outputFile + except OSError: + pass + + # load and run student submission + for i in studentIndexList: + print ("\n# Submit %s for lab %s" % (students[i], lab.name)) + cmd.upload(lab, student2file[students[i]]["full"]) + cmd.addJob(lab, student2file[students[i]]) +# end of main loop "cmdLine.args.indecies" + +if cmdLine.args.dry_run: + print "\nDry run done" + exit() + +print("\nNow waiting for %d output files..." % len(outputFiles)) +remainingFiles = list(outputFiles) +numberRemaining = len(remainingFiles) +loopDelay = 5 +badOutputFiles = [] +inconsistentResults = [] +noCompareResults = [] +justFinishedFiles = [] + +while True and len(outputFiles) > 0: + time.sleep(loopDelay) + + # if we check the output file for scores as soon as it shows up, + # the file may not fulled copied. So we check the files found in + # the last round. + for file in justFinishedFiles: + OK = util.outputOK(file) + if OK: + print("Output ready: %s" % file) + else: + badOutputFiles.append(file) + print("Output missing scores: %s" % file) + + if checkHandinOutput: + matchObj = re.match(r'(.*)_[0-9]+_.*', os.path.basename(file), re.M|re.I) + email = matchObj.group(1) + if student2file[email]["result"] == None: + noCompareResults.append(file) + print("No existing result for comparison") + elif student2file[email]["result"] != OK: + inconsistentResults.append([student2file[email]["existingOutput"], file]) + print("Inconsistent with existing result %s" % student2file[email]["existingOutput"]) + + justFinishedFiles = [] + for file in remainingFiles: + if os.path.exists(file) and os.path.getmtime(file) > startTime: + justFinishedFiles.append(file) + remainingFiles = set(remainingFiles) - set(justFinishedFiles) + nFinished = numberRemaining - len(remainingFiles) + print("%d jobs finished in the last %d seconds" % (nFinished, loopDelay)) + print("%d unfinished out of %d" % (len(remainingFiles), len(outputFiles))) + now = time.mktime(datetime.datetime.now().timetuple()) + print("elapsed time: %s\n" % (str(datetime.timedelta(seconds = now - startTime)))) + + numberRemaining = len(remainingFiles) + if numberRemaining == 0 and not justFinishedFiles: + print "All output files are counted for :))" + break + +if badOutputFiles: + print("Found %d output files without scores" % len(badOutputFiles)) + for f in badOutputFiles: + print("Output without scores: %s" % f) + +if inconsistentResults: + print("Found %d inconsistent results" % len(inconsistentResults)) + for r in inconsistentResults: + r0 = "with scores" if util.outputOK(r[0]) else "without scores" + r1 = "with scores" if util.outputOK(r[1]) else "without scores" + print("Existing(%s): %s new(%s): %s" % (r0, r[0], r1, r[1])) + +if noCompareResults: + print("Found %d results without existing comparision" % len(noCompareResults)) + for f in noCompareResults: + print("No comparison: %s" % f) diff --git a/tools/util.py b/tools/util.py new file mode 100644 index 00000000..a873163d --- /dev/null +++ b/tools/util.py @@ -0,0 +1,169 @@ +import subprocess, os, argparse, glob, re, json + +class CommandLine(): + def printLabs(self, name=None): + print ("available tests:") + print ("index\ttest") + i = 0 + for lab in self.cfg.labs: + print ("%d\t%s" % (i, lab["name"])) + i += 1 + print + + def __init__(self, cfg): + self.cfg = cfg + parser = argparse.ArgumentParser(description='Drive jobs to Tango', + usage=self.printLabs()) + parser.add_argument('indecies', metavar='index', type=int, nargs='+', + help="index of a test") + parser.add_argument('-s', '--students', metavar='student', nargs='+', + help="student emails (can be partial)") + parser.add_argument('-f', '--failures', action='store_true', + help="exam failures") + parser.add_argument('-r', '--re_run', action='store_true', + help="re-run failed jobs") + parser.add_argument('-H', '--handin_records', action='store_true', + help="exam failures or re-run jobs from handin records") + parser.add_argument('-l', '--list_students', action='store_true', + help="list student submissions") + parser.add_argument('-j', '--jobs', action='store_true', + help="list all jobs (test index ignored)") + parser.add_argument('-d', '--dry_run', action='store_true', + help="dry_run") + parser.add_argument('-v', '--verbose', action='store_true', + help="more info") + self.args = parser.parse_args() +# end of class CmdLine + +# represent attributes associated to a given lab +class Lab: + def __init__(self, cfg, cmdLine, labIndex): + self.cfg = cfg + self.name = cfg.labs[labIndex]["name"] + self.handinSuffix = cfg.labs[labIndex]["handinSuffix"] + self.image = cfg.labs[labIndex]["image"] + self.courseLab = cfg.course + "." + self.name + self.courseLabDir = cfg.courseRoot + "/" + self.name + self.makefile = self.courseLabDir + "/" + "autograde-Makefile" + self.autogradeTar = self.courseLabDir + "/" + "autograde.tar" + + self.handinDir = "/".join([self.courseLabDir, "handin"]) + self.handinFileQuery = "/".join([self.handinDir, "*" + self.handinSuffix]) + self.handinOutputFileQuery = "/".join([self.handinDir, "*_autograde.txt"]) + self.handinOutputFileSuffix = "_" + self.name + "_autograde.txt" + + self.outputDir = "/".join([cfg.tangoFileRoot, + "test-" + self.courseLab, + "output"]) + self.outputFileQuery = self.outputDir + "/*" + self.name + ".txt" + + if cmdLine.args.handin_records: + self.outputFileQuery = self.courseLabDir + "/handin/*" + self.name + "_autograde.txt" + print "EXAM FAILURES from", self.outputFileQuery +# end of class Lab + +class Cmd: + def __init__(self, cfg, cmdLine): + self.cfg = cfg + self.cmdLine = cmdLine + outBytes = subprocess.check_output(["ps", "-auxw"]) + for line in outBytes.decode("utf-8").split("\n"): + if cfg.tangoHostPort in line: + argList = line.split() + for index, token in enumerate(argList): + if token == "-container-ip": + cfg.tangoIP = argList[index + 1] + if cfg.tangoIP == "": + print "ERROR: Cannot find tango server IP" + exit(-1) + + self.basic = "python " + cfg.tangoDir + "/clients/tango-cli.py" + self.basic += " -s " + cfg.tangoIP + " -P 8600" + " -k test" + + print "CMD BASE:", self.basic + #end of __init__ + + def run(self, cmd): # an internal util function + if self.cmdLine.args.dry_run: + print "DRY-RUN tango-cli", cmd + else: + print "EXEC tango-cli", cmd + os.system(self.basic + cmd) + print "=======================================" + + def runAndOutput(self, cmd): + print "EXEC-CAPTURE tango-cli", cmd + return os.popen(self.basic + cmd).read() + + def info(self): + self.run(" --info") + + def open(self, lab): + self.run(" --open -l " + lab.courseLab) + + def upload(self, lab, file): + self.run(" --upload --filename " + file + " -l " + lab.courseLab) + + def addJob(self, lab, studentFile): + myCmd = " --addJob --image " + lab.image + " -l " + lab.courseLab + myCmd += " --jobname job_" + studentFile["job"] + myCmd += " --outputFile " + studentFile["output"] + myCmd += " --infiles" + myCmd += " '{\"localFile\": \"%s\", \"destFile\": \"%s\"}' " % \ + (studentFile["base"], studentFile["stripped"]) + myCmd += " '{\"localFile\": \"autograde-Makefile\", \"destFile\": \"Makefile\"}' " + myCmd += " '{\"localFile\": \"autograde.tar\", \"destFile\": \"autograde.tar\"}' " + self.run(myCmd) + + def poll(self, lab, studentFile): + myCmd = " --poll -l " + lab.courseLab + self.run(myCmd + " --outputFile " + studentFile["output"]) + + def returnLiveJobs(self): + return json.loads(self.runAndOutput(" --jobs ").splitlines()[1]) + + def jobs(self): + result = json.loads(self.runAndOutput(" --jobs ").splitlines()[1]) + nJobs = len(result["jobs"]) + print "Waiting/running jobs:", nJobs + print json.dumps(result, indent=2, sort_keys=True) + print "=======================================" + + result = json.loads(self.runAndOutput(" --jobs --deadJobs 1 ").splitlines()[1]) + nJobs = len(result["jobs"]) + print "Completed jobs:", nJobs + print json.dumps(result, indent=2, sort_keys=True) +# end of class Cmd + +# =================== stand alone functions ====================== + +# get student handin files or output files, assuming file names start with student email +def getStudent2file(lab, fileQuery): + files = sorted(glob.glob(lab.outputFileQuery)) # files are sorted by student email + students = [] + student2file = {} + student2version = {} + + for f in files: + baseName = f.split("/").pop() + matchObj = re.match(r'(.*)_([0-9]+)_(.*)', baseName, re.M|re.I) + (email, version) = (matchObj.group(1), matchObj.group(2)) + if email not in students: + students.append(email) + if email not in student2version or version > student2version[email]: + student2version[email] = version + student2file[email] = f + return (students, student2file) + +def getRerunList(cfg, lab): + (students, student2file) = getStudent2file(lab, lab.outputFileQuery) + + failedStudents = [] + for s in students: + if "\"scores\":" not in open(student2file[s]).read(): + failedStudents.append(s) + + return failedStudents + +def outputOK(file): + return True if "\"scores\":" in open(file).read() else False diff --git a/vmms/distDocker.py b/vmms/distDocker.py index c5726176..8896c4ee 100644 --- a/vmms/distDocker.py +++ b/vmms/distDocker.py @@ -250,7 +250,7 @@ def runJob(self, vm, runTimeout, maxOutputFileSize): def copyOut(self, vm, destFile): - """ copyOut - Copy the autograder feedback from container to + """ copyOut - Copy the autodriver feedback from container to destFile on the Tango host. Then, destroy that container. Containers are never reused. """ diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index 303bf381..bf86838d 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -1,25 +1,27 @@ # # ec2SSH.py - Implements the Tango VMMS interface to run Tango jobs on Amazon EC2. # -# This implementation uses the AWS EC2 SDK to manage the virtual machines and -# ssh and scp to access them. The following excecption are raised back -# to the caller: -# -# Ec2Exception - EC2 raises this if it encounters any problem -# ec2CallError - raised by ec2Call() function -# +# ssh and scp to access them. + +import __main__ import subprocess import os import re import time import logging +import datetime +from threading import Timer +import pytz import config - -import boto -from boto import ec2 from tangoObjects import TangoMachine +import boto3 +from botocore.exceptions import ClientError + +### added to suppress boto XML output -- Jason Boles +logging.getLogger('boto3').setLevel(logging.CRITICAL) +logging.getLogger('botocore').setLevel(logging.CRITICAL) def timeout(command, time_out=1): """ timeout - Run a unix command with a timeout. Return -1 on @@ -49,7 +51,6 @@ def timeout(command, time_out=1): returncode = p.poll() return returncode - def timeoutWithReturnStatus(command, time_out, returnValue=0): """ timeoutWithReturnStatus - Run a Unix command with a timeout, until the expected value is returned by the command; On timeout, @@ -71,44 +72,81 @@ def timeoutWithReturnStatus(command, time_out, returnValue=0): stderr=subprocess.STDOUT) return ret -# -# User defined exceptions -# -# ec2Call() exception - - -class ec2CallError(Exception): - pass - - class Ec2SSH: _SSH_FLAGS = ["-i", config.Config.SECURITY_KEY_PATH, "-o", "StrictHostKeyChecking no", "-o", "GSSAPIAuthentication no"] + _SECURITY_KEY_PATH_INDEX_IN_SSH_FLAGS = 1 - def __init__(self, accessKeyId=None, accessKey=None): + def __init__(self, accessKeyId=None, accessKey=None, ec2User=None): """ log - logger for the instance connection - EC2Connection object that stores the connection info to the EC2 network instance - Instance object that stores information about the VM created """ + + self.appName = os.path.basename(__main__.__file__).strip(".py") + self.local_tz = pytz.timezone(config.Config.AUTODRIVER_LOGGING_TIME_ZONE) + self.log = logging.getLogger("Ec2SSH-" + str(os.getpid())) + self.log.info("init Ec2SSH in program %s" % self.appName) + self.ssh_flags = Ec2SSH._SSH_FLAGS - if accessKeyId: - self.connection = ec2.connect_to_region(config.Config.EC2_REGION, - aws_access_key_id=accessKeyId, aws_secret_access_key=accessKey) - self.useDefaultKeyPair = False - else: - self.connection = ec2.connect_to_region(config.Config.EC2_REGION) - self.useDefaultKeyPair = True - self.log = logging.getLogger("Ec2SSH") + self.ec2User = ec2User if ec2User else config.Config.EC2_USER_NAME + self.useDefaultKeyPair = False if accessKeyId else True + + self.img2ami = {} + images = [] - def instanceName(self, id, name): + try: + self.boto3client = boto3.client("ec2", config.Config.EC2_REGION, + aws_access_key_id=accessKeyId, + aws_secret_access_key=accessKey) + self.boto3resource = boto3.resource("ec2", config.Config.EC2_REGION) + + images = self.boto3resource.images.filter(Owners=["self"]) + except Exception as e: + self.log.error("Ec2SSH init Failed: %s"% e) + raise # serious error + + # Note: By convention, all usable images to Tango must have "Name" tag + # whose value is the image name, such as xyz or xyz.img (older form). + # xyz is also the preallocator pool name for vms using this image, if + # instance type is not specified. + + for image in images: + if image.tags: + for tag in image.tags: + if tag["Key"] == "Name": + if tag["Value"]: + if tag["Value"] in self.img2ami: + self.log.info("Ignore %s for duplicate name tag %s" % + (image.id, tag["Value"])) + else: + self.img2ami[tag["Value"]] = image + self.log.info("Found image: %s with name tag %s" % + (image.id, tag["Value"])) + + imageAMIs = [item.id for item in images] + taggedAMIs = [self.img2ami[key].id for key in self.img2ami] + ignoredAMIs = list(set(imageAMIs) - set(taggedAMIs)) + if (len(ignoredAMIs) > 0): + self.log.info("Ignored images %s for lack of or ill-formed name tag" % + str(ignoredAMIs)) + + if self.appName == "jobManager": + self.setTimer4cleanup() + # end of __init__ + + # + # VMMS helper methods + # + + def instanceName(self, id, pool): """ instanceName - Constructs a VM instance name. Always use - this function when you need a VM instance name. Never generate - instance names manually. + this function when you need a VM instance name, or use vm.name """ - return "%s-%d-%s" % (config.Config.PREFIX, id, name) + return "%s-%d-%s" % (config.Config.PREFIX, id, pool) def keyPairName(self, id, name): """ keyPairName - Constructs a unique key pair name. @@ -120,9 +158,6 @@ def domainName(self, vm): instance. """ return vm.domain_name - # - # VMMS helper methods - # def tangoMachineToEC2Instance(self, vm): """ tangoMachineToEC2Instance - returns an object with EC2 instance @@ -131,40 +166,37 @@ def tangoMachineToEC2Instance(self, vm): """ ec2instance = dict() - memory = vm.memory # in Kbytes - cores = vm.cores - - if (cores == 1 and memory <= 613 * 1024): - ec2instance['instance_type'] = 't2.micro' - elif (cores == 1 and memory <= 1.7 * 1024 * 1024): - ec2instance['instance_type'] = 'm1.small' - elif (cores == 1 and memory <= 3.75 * 1024 * 1024): - ec2instance['instance_type'] = 'm3.medium' - elif (cores == 2): - ec2instance['instance_type'] = 'm3.large' - elif (cores == 4): - ec2instance['instance_type'] = 'm3.xlarge' - elif (cores == 8): - ec2instance['instance_type'] = 'm3.2xlarge' - else: - ec2instance['instance_type'] = config.Config.DEFAULT_INST_TYPE + # Note: Unlike other vmms backend, instance type is chosen from + # the optional instance type attached to image name as + # "image+instance_type", such as my_course_mage+t2.small. + + ec2instance['instance_type'] = config.Config.DEFAULT_INST_TYPE + if vm.instance_type: + ec2instance['instance_type'] = vm.instance_type - ec2instance['ami'] = config.Config.DEFAULT_AMI + ec2instance['ami'] = self.img2ami[vm.image].id + self.log.info("tangoMachineToEC2Instance: %s" % str(ec2instance)) return ec2instance def createKeyPair(self): # try to delete the key to avoid collision self.key_pair_path = "%s/%s.pem" % \ - (config.Config.DYNAMIC_SECURITY_KEY_PATH, self.key_pair_name) + (config.Config.DYNAMIC_SECURITY_KEY_PATH, + self.key_pair_name) self.deleteKeyPair() - key = self.connection.create_key_pair(self.key_pair_name) - key.save(config.Config.DYNAMIC_SECURITY_KEY_PATH) + response = self.boto3client.create_key_pair(KeyName=self.key_pair_name) + keyFile = open(self.key_pair_path, "w+") + keyFile.write(response["KeyMaterial"]) + os.chmod(self.key_pair_path, 0o600) # read only by owner + keyFile.close() + # change the SSH_FLAG accordingly - self.ssh_flags[1] = self.key_pair_path + self.ssh_flags[Ec2SSH._SECURITY_KEY_PATH_INDEX_IN_SSH_FLAGS] = self.key_pair_path + return self.key_pair_path def deleteKeyPair(self): - self.connection.delete_key_pair(self.key_pair_name) + self.boto3client.delete_key_pair(KeyName=self.key_pair_name) # try to delete may not exist key file try: os.remove(self.key_pair_path) @@ -174,27 +206,31 @@ def deleteKeyPair(self): def createSecurityGroup(self): # Create may-exist security group try: - security_group = self.connection.create_security_group( - config.Config.DEFAULT_SECURITY_GROUP, - "Autolab security group - allowing all traffic") - # All ports, all traffics, all ips - security_group.authorize(from_port=None, - to_port=None, ip_protocol='-1', cidr_ip='0.0.0.0/0') - except boto.exception.EC2ResponseError: + response = self.boto3resource.create_security_group( + GroupName=config.Config.DEFAULT_SECURITY_GROUP, + Description="Autolab security group - allowing all traffic") + security_group_id = response['GroupId'] + self.boto3resource.authorize_security_group_ingress( + GroupId=security_group_id) + except ClientError as e: + # security group may have been created already pass # # VMMS API functions # + def initializeVM(self, vm): - """ initializeVM - Tell EC2 to create a new VM instance. + """ initializeVM - Tell EC2 to create a new VM instance. return None on failure Returns a boto.ec2.instance.Instance object. """ # Create the instance and obtain the reservation + newInstance = None try: - instanceName = self.instanceName(vm.id, vm.name) + vm.name = self.instanceName(vm.id, vm.pool) ec2instance = self.tangoMachineToEC2Instance(vm) + self.log.info("initializeVM: %s %s" % (vm.name, str(ec2instance))) # ensure that security group exists self.createSecurityGroup() if self.useDefaultKeyPair: @@ -202,56 +238,79 @@ def initializeVM(self, vm): self.key_pair_path = config.Config.SECURITY_KEY_PATH else: self.key_pair_name = self.keyPairName(vm.id, vm.name) - self.createKeyPair() - + self.key_pair_path = self.createKeyPair() + + reservation = self.boto3resource.create_instances(ImageId=ec2instance['ami'], + InstanceType=ec2instance['instance_type'], + KeyName=self.key_pair_name, + SecurityGroups=[ + config.Config.DEFAULT_SECURITY_GROUP], + MaxCount=1, + MinCount=1) + + # Sleep for a while to prevent random transient errors observed + # when the instance is not available yet + time.sleep(config.Config.TIMER_POLL_INTERVAL) - reservation = self.connection.run_instances( - ec2instance['ami'], - key_name=self.key_pair_name, - security_groups=[ - config.Config.DEFAULT_SECURITY_GROUP], - instance_type=ec2instance['instance_type']) + newInstance = reservation[0] + if not newInstance: + raise ValueError("cannot find new instance for %s" % vm.name) # Wait for instance to reach 'running' state - state = -1 start_time = time.time() - while state is not config.Config.INSTANCE_RUNNING: + while True: + # Note: You'd think we should be able to read the state from the + # instance but that turns out not working. So we round up all + # running intances and find our instance by instance id + + filters=[{'Name': 'instance-state-name', 'Values': ['running']}] + instances = self.boto3resource.instances.filter(Filters=filters) + instanceRunning = False + + newInstance.load() # reload the state of the instance + for inst in instances.filter(InstanceIds=[newInstance.id]): + self.log.debug("VM %s %s: is running" % (vm.name, newInstance.id)) + instanceRunning = True - for inst in self.connection.get_all_instances(): - if inst.id == reservation.id: - newInstance = inst.instances.pop() + if instanceRunning: + break - state = newInstance.state_code - self.log.debug( - "VM %s: Waiting to reach 'running' state. Current state: %s (%d)" % - (instanceName, newInstance.state, state)) + if time.time() - start_time > config.Config.INITIALIZEVM_TIMEOUT: + raise ValueError("VM %s %s: timeout (%d seconds) before reaching 'running' state" % + (vm.name, newInstance.id, config.Config.TIMER_POLL_INTERVAL)) + + self.log.debug("VM %s %s: Waiting to reach 'running' from 'pending'" % (vm.name, newInstance.id)) time.sleep(config.Config.TIMER_POLL_INTERVAL) - elapsed_secs = time.time() - start_time - if (elapsed_secs > config.Config.INITIALIZEVM_TIMEOUT): - self.log.debug( - "VM %s: Did not reach 'running' state before timeout period of %d" % - (instanceName, config.Config.TIMER_POLL_INTERVAL)) + # end of while loop + + # tag the instance + self.boto3resource.create_tags(Resources=[newInstance.id], + Tags=[{"Key": "Name", "Value": vm.name}]) + self.log.info("new instance %s created with name tag %s" % + (newInstance.id, vm.name)) self.log.info( "VM %s | State %s | Reservation %s | Public DNS Name %s | Public IP Address %s" % - (instanceName, + (vm.name, newInstance.state, - reservation.id, + reservation, newInstance.public_dns_name, - newInstance.ip_address)) + newInstance.public_ip_address)) # Save domain and id ssigned by EC2 in vm object - vm.domain_name = newInstance.ip_address - vm.ec2_id = newInstance.id - # Assign name to EC2 instance - self.connection.create_tags( - [newInstance.id], {"Name": instanceName}) - self.log.debug("VM %s: %s" % (instanceName, newInstance)) + vm.domain_name = newInstance.public_ip_address + vm.instance_id = newInstance.id + self.log.debug("VM %s: %s" % (vm.name, newInstance)) return vm except Exception as e: - self.log.debug("initializeVM Failed: %s" % e) - + self.log.error("initializeVM Failed for vm %s: %s" % (vm.name, e)) + if newInstance: + try: + self.boto3resource.instances.filter(InstanceIds=[newInstance.id]).terminate() + except Exception as e: + self.log.error("Exception handling failed for %s: %s" % (vm.name, e)) + return None return None def waitVM(self, vm, max_secs): @@ -261,11 +320,18 @@ def waitVM(self, vm, max_secs): VM is a boto.ec2.instance.Instance object. """ + self.log.info("WaitVM: %s %s" % (vm.name, vm.instance_id)) + + # test if the vm is still an instance + if not self.existsVM(vm): + self.log.info("VM %s: no longer an instance" % vm.name) + return -1 + # First, wait for ping to the vm instance to work instance_down = 1 - instanceName = self.instanceName(vm.id, vm.name) start_time = time.time() domain_name = self.domainName(vm) + self.log.info("WaitVM: pinging %s %s" % (domain_name, vm.name)) while instance_down: instance_down = subprocess.call("ping -c 1 %s" % (domain_name), shell=True, @@ -278,6 +344,7 @@ def waitVM(self, vm, max_secs): time.sleep(config.Config.TIMER_POLL_INTERVAL) elapsed_secs = time.time() - start_time if (elapsed_secs > max_secs): + self.log.debug("WAITVM_TIMEOUT: %s" % vm.id) return -1 # The ping worked, so now wait for SSH to work before @@ -290,19 +357,18 @@ def waitVM(self, vm, max_secs): # Give up if the elapsed time exceeds the allowable time if elapsed_secs > max_secs: self.log.info( - "VM %s: SSH timeout after %d secs" % - (instanceName, elapsed_secs)) + "VM %s: SSH timeout after %d secs" % (vm.name, elapsed_secs)) return -1 # If the call to ssh returns timeout (-1) or ssh error # (255), then success. Otherwise, keep trying until we run # out of time. + ret = timeout(["ssh"] + self.ssh_flags + - ["%s@%s" % (config.Config.EC2_USER_NAME, domain_name), + ["%s@%s" % (self.ec2User, domain_name), "(:)"], max_secs - elapsed_secs) - self.log.debug("VM %s: ssh returned with %d" % - (instanceName, ret)) + self.log.debug("VM %s: ssh returned with %d" % (vm.name, ret)) if (ret != -1) and (ret != 255): return 0 @@ -337,18 +403,28 @@ def runJob(self, vm, runTimeout, maxOutputFileSize): redirect output to file "output". """ domain_name = self.domainName(vm) - self.log.debug("runJob: Running job on VM %s" % - self.instanceName(vm.id, vm.name)) - # Setting ulimits for VM and running job - runcmd = "/usr/bin/time --output=time.out autodriver -u %d -f %d -t \ - %d -o %d autolab &> output" % (config.Config.VM_ULIMIT_USER_PROC, - config.Config.VM_ULIMIT_FILE_SIZE, - runTimeout, - maxOutputFileSize) + self.log.debug("runJob: Running job on VM %s" % vm.name) + + # Setting arguments for VM and running job + runcmd = "/usr/bin/time --output=time.out autodriver \ + -u %d -f %d -t %d -o %d " % ( + config.Config.VM_ULIMIT_USER_PROC, + config.Config.VM_ULIMIT_FILE_SIZE, + runTimeout, + maxOutputFileSize) + if hasattr(config.Config, 'AUTODRIVER_LOGGING_TIME_ZONE') and \ + config.Config.AUTODRIVER_LOGGING_TIME_ZONE: + runcmd = runcmd + ("-z %s " % config.Config.AUTODRIVER_LOGGING_TIME_ZONE) + if hasattr(config.Config, 'AUTODRIVER_TIMESTAMP_INTERVAL') and \ + config.Config.AUTODRIVER_TIMESTAMP_INTERVAL: + runcmd = runcmd + ("-i %d " % config.Config.AUTODRIVER_TIMESTAMP_INTERVAL) + runcmd = runcmd + "autolab &> output" + + # runTimeout * 2 is a conservative estimate. + # autodriver handles timeout on the target vm. ret = timeout(["ssh"] + self.ssh_flags + ["%s@%s" % (config.Config.EC2_USER_NAME, domain_name), runcmd], runTimeout * 2) return ret - # runTimeout * 2 is a temporary hack. The driver will handle the timout def copyOut(self, vm, destFile): """ copyOut - Copy the file output on the VM to the file @@ -395,50 +471,164 @@ def copyOut(self, vm, destFile): def destroyVM(self, vm): """ destroyVM - Removes a VM from the system """ - ret = self.connection.terminate_instances(instance_ids=[vm.ec2_id]) - # delete dynamically created key - if not self.useDefaultKeyPair: - self.deleteKeyPair() - return ret + + self.log.info("destroyVM: %s %s %s %s" % + (vm.instance_id, vm.name, vm.keepForDebugging, vm.notes)) + + try: + # Keep the vm and mark with meaningful tags for debugging + if hasattr(config.Config, 'KEEP_VM_AFTER_FAILURE') and \ + config.Config.KEEP_VM_AFTER_FAILURE and vm.keepForDebugging: + self.log.info("Will keep VM %s for further debugging" % vm.name) + instance = self.boto3resource.Instance(vm.instance_id) + # delete original name tag and replace it with "failed-xyz" + # add notes tag for test name + tag = self.boto3resource.Tag(vm.instance_id, "Name", vm.name) + if tag: + tag.delete() + instance.create_tags(Tags=[{"Key": "Name", "Value": "failed-" + vm.name}]) + instance.create_tags(Tags=[{"Key": "Notes", "Value": vm.notes}]) + return + + self.boto3resource.instances.filter(InstanceIds=[vm.instance_id]).terminate() + # delete dynamically created key + if not self.useDefaultKeyPair: + self.deleteKeyPair() + + except Exception as e: + self.log.error("destroyVM init Failed: %s for vm %s" % (e, vm.instance_id)) + pass def safeDestroyVM(self, vm): return self.destroyVM(vm) + # return None or tag value if key exists + def getTag(self, tagList, tagKey): + if tagList: + for tag in tagList: + if tag["Key"] == tagKey: + return tag["Value"] + return None + def getVMs(self): - """ getVMs - Returns the complete list of VMs on this account. Each + """ getVMs - Returns the running or pending VMs on this account. Each list entry is a boto.ec2.instance.Instance object. """ - # TODO: Find a way to return vm objects as opposed ec2 instance - # objects. - instances = list() - for i in self.connection.get_all_instances(): - if i.id is not config.Config.TANGO_RESERVATION_ID: - inst = i.instances.pop() - if inst.state_code is config.Config.INSTANCE_RUNNING: - instances.append(inst) - - vms = list() - for inst in instances: - vm = TangoMachine() - vm.ec2_id = inst.id - vm.name = str(inst.tags.get('Name')) - self.log.debug('getVMs: Instance - %s, EC2 Id - %s' % - (vm.name, vm.ec2_id)) - vms.append(vm) - - return vms + + try: + vms = list() + filters=[{'Name': 'instance-state-name', 'Values': ['running', 'pending']}] + + for inst in self.boto3resource.instances.filter(Filters=filters): + vm = TangoMachine() # make a Tango internal vm structure + vm.instance_id = inst.id + vm.id = None # the serial number as in inst name PREFIX-serial-IMAGE + vm.domain_name = None + + instName = self.getTag(inst.tags, "Name") + # Name tag is the standard form of prefix-serial-image + if not (instName and re.match("%s-" % config.Config.PREFIX, instName)): + self.log.debug('getVMs: Instance id %s skipped' % vm.instance_id) + continue # instance without name tag or proper prefix + + vm.id = int(instName.split("-")[1]) + vm.pool = instName.split("-")[2] + vm.name = instName + if inst.public_ip_address: + vm.domain_name = inst.public_ip_address + vms.append(vm) + + self.log.debug('getVMs: Instance id %s, name %s' % + (vm.instance_id, vm.name)) + return vms + + except Exception as e: + self.log.debug("getVMs Failed: %s" % e) def existsVM(self, vm): - """ existsVM - Checks whether a VM exists in the vmms. + """ existsVM - Checks whether a VM exists in the vmms. Internal use. """ - instances = self.connection.get_all_instances() - for inst in instances: - if inst.instances[0].id is vm.ec2_id: - return True + filters=[{'Name': 'instance-state-name', 'Values': ['running']}] + instances = self.boto3resource.instances.filter(Filters=filters) + for inst in instances.filter(InstanceIds=[vm.instance_id]): + self.log.debug("VM %s %s: exists and running" % (vm.instance_id, vm.name)) + return True return False def getImages(self): - """ getImages - return a constant; actually use the ami specified in config + """ getImages - return a constant; actually use the ami specified in config """ - return ["default.img"] + self.log.info("getImages: %s" % str(list(self.img2ami.keys()))) + return list(self.img2ami.keys()) + + def setTimer4cleanup(self): + # start a timer to cleanup stale vms + t = Timer(60, self.cleanupUntaggedStaleVMs) + t.daemon = True # timer thread will not hold off process termination + t.start() + + def cleanupUntaggedStaleVMs(self): + self.log.info("cleanupUntaggedStaleVMs") + + nameAndInstances = [] + filters=[{'Name': 'instance-state-name', 'Values': ['running', 'pending']}] + instanceType = 'running or pending' + + try: + instances = self.boto3resource.instances.filter(Filters=filters) + for instance in self.boto3resource.instances.filter(Filters=filters): + creationTime = instance.launch_time + localCreationTime = creationTime.replace(tzinfo=pytz.utc).astimezone(self.local_tz) + launchTime = localCreationTime.strftime("%Y%m%d-%H:%M:%S") + nowTime = datetime.datetime.utcnow() + age = int((nowTime.replace(tzinfo=pytz.utc) - creationTime.replace(tzinfo=pytz.utc)).total_seconds()) + nameAndInstances.append({"Name": self.getTag(instance.tags, "Name"), + "launchTime": launchTime, + "age": age, + "Instance": instance}) + self.log.info("number of running/pending instances: %d" % len(nameAndInstances)) + + nameNone = [] + named = [] + for item in nameAndInstances: + if item["Name"]: + named.append(item) + else: + nameNone.append(item) + + staleSet = [] + nameNone.sort(key=lambda x: x["age"], reverse=True) # oldest first + for item in nameNone: + instance = item["Instance"] + stale = "" + if item["age"] > config.Config.INITIALIZEVM_TIMEOUT * 2: # multiply 2 to be conservative + staleSet.append(item) + stale = "(STALE)" + self.log.info("[%s]: %s, age: %s, launch time: %s, state: %s %s" % + (item["Name"], instance.id, item["age"], + item["launchTime"], instance.state["Name"], stale)) + + named.sort(key=lambda x: x["Name"]) + for item in named: + instance = item["Instance"] + self.log.info("[%s]: %s, age: %s, launch time: %s, state: %s" % + (item["Name"], instance.id, item["age"], + item["launchTime"], instance.state["Name"])) + + # Delete VMs. Note that we do nothing to the pools because + # untagged VMs can't enter a pool + for item in staleSet: + instance = item["Instance"] + vm = TangoMachine(vmms="ec2SSH") + vm.instance_id = instance.id + vm.name = None + self.log.info("cleanup untagged stale instance %s, age: %s, launch time: %s" % + (instance.id, item["age"], item["launchTime"])) + self.destroyVM(vm) + + except Exception as e: + self.log.debug("cleanupUntaggedStaleVMs exception: %s" % e) + + self.setTimer4cleanup() # set the next timer interval + # end of cleanupUntaggedStaleVMs() diff --git a/vmms/localDocker.py b/vmms/localDocker.py index 45b54145..45565aec 100644 --- a/vmms/localDocker.py +++ b/vmms/localDocker.py @@ -155,7 +155,7 @@ def runJob(self, vm, runTimeout, maxOutputFileSize): def copyOut(self, vm, destFile): - """ copyOut - Copy the autograder feedback from container to + """ copyOut - Copy the autodriver feedback from container to destFile on the Tango host. Then, destroy that container. Containers are never reused. """ diff --git a/vmms/tashiSSH.py b/vmms/tashiSSH.py index ea05e114..af9c567a 100644 --- a/vmms/tashiSSH.py +++ b/vmms/tashiSSH.py @@ -337,7 +337,8 @@ def destroyVM(self, vm): """ destroyVM - Removes a VM from the system """ ret = self.tashiCall("destroyVm", [vm.instance_id]) - return ret + self.log.debug("Destroying VM %s status %s" % (vm.instance_id, ret)) + return def safeDestroyVM(self, vm): """ safeDestroyVM - More robust version of destroyVM. diff --git a/worker.py b/worker.py index e7ffec25..43a8da01 100644 --- a/worker.py +++ b/worker.py @@ -35,32 +35,25 @@ def __init__(self, job, vmms, jobQueue, preallocator, preVM): self.preallocator = preallocator self.preVM = preVM threading.Thread.__init__(self) - self.log = logging.getLogger("Worker") + self.log = logging.getLogger("Worker-" + str(os.getpid())) # # Worker helper functions # - def detachVM(self, return_vm=False, replace_vm=False): + def detachVM(self, return_vm=False): """ detachVM - Detach the VM from this worker. The options are to return it to the pool's free list (return_vm), destroy it - (not return_vm), and if destroying it, whether to replace it - or not in the pool (replace_vm). The worker must always call - this function before returning. + (if not return_vm). """ # job-owned instance, simply destroy after job is completed if self.job.accessKeyId: self.vmms.safeDestroyVM(self.job.vm) elif return_vm: + # put vm into free pool. may destroy it if free pool is over low water mark self.preallocator.freeVM(self.job.vm) else: self.vmms.safeDestroyVM(self.job.vm) - if replace_vm: - self.preallocator.createVM(self.job.vm) - - # Important: don't remove the VM from the pool until its - # replacement has been created. Otherwise there is a - # potential race where the job manager thinks that the - # pool is empty and creates a spurious vm. + self.log.info("removeVM %s" % self.job.vm.id); self.preallocator.removeVM(self.job.vm) def rescheduleJob(self, hdrfile, ret, err): @@ -68,14 +61,6 @@ def rescheduleJob(self, hdrfile, ret, err): of a system error, such as a VM timing out or a connection failure. """ - self.log.error("Job %s:%d failed: %s" % - (self.job.name, self.job.id, err)) - self.job.appendTrace( - "%s|Job %s:%d failed: %s" % - (datetime.now().ctime(), - self.job.name, - self.job.id, - err)) # Try a few times before giving up if self.job.retries < Config.JOB_RETRIES: @@ -83,7 +68,7 @@ def rescheduleJob(self, hdrfile, ret, err): os.remove(hdrfile) except OSError: pass - self.detachVM(return_vm=False, replace_vm=True) + self.detachVM(return_vm=False) self.jobQueue.unassignJob(self.job.id) # Here is where we give up @@ -103,31 +88,32 @@ def rescheduleJob(self, hdrfile, ret, err): ret["copyout"])) self.catFiles(hdrfile, self.job.outputFile) - self.detachVM(return_vm=False, replace_vm=True) + self.detachVM(return_vm=False) self.notifyServer(self.job) def appendMsg(self, filename, msg): """ appendMsg - Append a timestamped Tango message to a file """ f = open(filename, "a") - f.write("Autograder [%s]: %s\n" % (datetime.now().ctime(), msg)) + f.write("Autolab [%s]: %s\n" % (datetime.now().ctime(), msg)) f.close() def catFiles(self, f1, f2): """ catFiles - cat f1 f2 > f2, where f1 is the Tango header and f2 is the output from the Autodriver """ - self.appendMsg(f1, "Here is the output from the autograder:\n---") + self.appendMsg(f1, "Output of autodriver from grading VM:\n") (wfd, tmpname)=tempfile.mkstemp(dir=os.path.dirname(f2)) wf=os.fdopen(wfd, "a") with open(f1, "rb") as f1fd: shutil.copyfileobj(f1fd, wf) - # f2 may not exist if autograder failed + # f2 may not exist if autodriver failed try: with open(f2, "rb") as f2fd: shutil.copyfileobj(f2fd, wf) - except OSError: - pass + except IOError: + wf.write("NO OUTPUT FILE\n") + wf.close() os.rename(tmpname, f2) os.remove(f1) @@ -149,6 +135,30 @@ def notifyServer(self, job): except Exception as e: self.log.debug("Error in notifyServer: %s" % str(e)) + def afterJobExecution(self, hdrfile, msg, returnVM): + self.jobQueue.makeDead(self.job.id, msg) + + # Update the text that users see in the autodriver output file + self.appendMsg(hdrfile, msg) + self.catFiles(hdrfile, self.job.outputFile) + os.chmod(self.job.outputFile, 0o644) + + # Thread exit after termination + self.detachVM(return_vm=returnVM) + self.notifyServer(self.job) + return + + def jobLogAndTrace(self, stageMsg, vm, status=None): + msg = stageMsg + " %s for job %s:%d" % (vm.name, self.job.name, self.job.id) + + if (status != None): + if (status == 0): + msg = "done " + msg + else: + msg = "failed " + msg + " (status=%d)" % status + self.log.info(msg) + self.job.appendTrace(msg) + # # Main worker function # @@ -165,6 +175,7 @@ def run(self): self.log.debug("Run worker") vm = None + msg = "" # Header message for user hdrfile = tempfile.mktemp() @@ -173,57 +184,27 @@ def run(self): # Assigning job to a preallocated VM if self.preVM: # self.preVM: - self.log.debug("Assigning job to preallocated VM") self.job.vm = self.preVM self.job.updateRemote() - self.log.info("Assigned job %s:%d existing VM %s" % - (self.job.name, self.job.id, - self.vmms.instanceName(self.preVM.id, - self.preVM.name))) - self.job.appendTrace("%s|Assigned job %s:%d existing VM %s" % - (datetime.now().ctime(), - self.job.name, self.job.id, - self.vmms.instanceName(self.preVM.id, - self.preVM.name))) - self.log.debug("Assigned job to preallocated VM") + self.jobLogAndTrace("assigned VM (preallocated)", self.preVM) + # Assigning job to a new VM else: - self.log.debug("Assigning job to a new VM") - self.job.vm.id = self.job.id + self.job.vm.id = self.job.id # xxxXXX??? don't know how this works self.job.updateRemote() - self.log.info("Assigned job %s:%d new VM %s" % - (self.job.name, self.job.id, - self.vmms.instanceName(self.job.vm.id, - self.job.vm.name))) - self.job.appendTrace( - "%s|Assigned job %s:%d new VM %s" % - (datetime.now().ctime(), - self.job.name, - self.job.id, - self.vmms.instanceName( - self.job.vm.id, - self.job.vm.name))) - # Host name returned from EC2 is stored in the vm object self.vmms.initializeVM(self.job.vm) - self.log.debug("Asigned job to a new VM") + self.jobLogAndTrace("assigned VM (just initialized)", self.job.vm) vm = self.job.vm + returnVM = True # Wait for the instance to be ready - self.log.debug("Job %s:%d waiting for VM %s" % - (self.job.name, self.job.id, - self.vmms.instanceName(vm.id, vm.name))) - self.job.appendTrace("%s|Job %s:%d waiting for VM %s" % - (datetime.now().ctime(), - self.job.name, self.job.id, - self.vmms.instanceName(vm.id, vm.name))) - self.log.debug("Waiting for VM") + self.jobLogAndTrace("waiting for VM", vm) ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) - - self.log.debug("Waited for VM") + self.jobLogAndTrace("waiting for VM", vm, ret["waitvm"]) # If the instance did not become ready in a reasonable # amount of time, then reschedule the job, detach the VM, @@ -239,94 +220,58 @@ def run(self): # Thread Exit after waitVM timeout return - self.log.info("VM %s ready for job %s:%d" % - (self.vmms.instanceName(vm.id, vm.name), - self.job.name, self.job.id)) - self.job.appendTrace("%s|VM %s ready for job %s:%d" % - (datetime.now().ctime(), - self.vmms.instanceName(vm.id, vm.name), - self.job.name, self.job.id)) - # Copy input files to VM + self.jobLogAndTrace("copying to VM", vm) ret["copyin"] = self.vmms.copyIn(vm, self.job.input) + self.jobLogAndTrace("copying to VM", vm, ret["copyin"]) if ret["copyin"] != 0: Config.copyin_errors += 1 - self.log.info("Input copied for job %s:%d [status=%d]" % - (self.job.name, self.job.id, ret["copyin"])) - self.job.appendTrace("%s|Input copied for job %s:%d [status=%d]" % - (datetime.now().ctime(), - self.job.name, - self.job.id, ret["copyin"])) + msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) + self.afterJobExecution(hdrfile, msg, returnVM) + return # Run the job on the virtual machine + self.jobLogAndTrace("running on VM", vm) ret["runjob"] = self.vmms.runJob( vm, self.job.timeout, self.job.maxOutputFileSize) + self.jobLogAndTrace("running on VM", vm, ret["runjob"]) + # runjob may have failed. but go on with copyout to get the output if any + + # Copy the output back, even if runjob has failed + self.jobLogAndTrace("copying from VM", vm) + ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) + self.jobLogAndTrace("copying from VM", vm, ret["copyout"]) + + # handle failure(s) of runjob and/or copyout. runjob error takes priority. if ret["runjob"] != 0: Config.runjob_errors += 1 - if ret["runjob"] == -1: - Config.runjob_timeouts += 1 - self.log.info("Job %s:%d executed [status=%s]" % - (self.job.name, self.job.id, ret["runjob"])) - self.job.appendTrace("%s|Job %s:%d executed [status=%s]" % - (datetime.now().ctime(), - self.job.name, self.job.id, - ret["runjob"])) - - # Copy the output back. - ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) - if ret["copyout"] != 0: - Config.copyout_errors += 1 - self.log.info("Output copied for job %s:%d [status=%d]" % - (self.job.name, self.job.id, ret["copyout"])) - self.job.appendTrace("%s|Output copied for job %s:%d [status=%d]" - % (datetime.now().ctime(), - self.job.name, - self.job.id, ret["copyout"])) - - # Job termination. Notice that Tango considers - # things like runjob timeouts and makefile errors to be - # normal termination and doesn't reschedule the job. - self.log.info("Success: job %s:%d finished" % - (self.job.name, self.job.id)) - - # Move the job from the live queue to the dead queue - # with an explanatory message - msg = "Success: Autodriver returned normally" - (returnVM, replaceVM) = (True, False) - if ret["copyin"] != 0: - msg = "Error: Copy in to VM failed (status=%d)" % ( - ret["copyin"]) - elif ret["runjob"] != 0: if ret["runjob"] == 1: # This should never happen - msg = "Error: Autodriver usage error (status=%d)" % ( - ret["runjob"]) - elif ret["runjob"] == 2: - msg = "Error: Job timed out after %d seconds" % ( + msg = "Error: Autodriver usage error" + elif ret["runjob"] == -1 or ret["runjob"] == 2: # both are timeouts + Config.runjob_timeouts += 1 + msg = "Error: Job timed out. timeout setting: %d seconds" % ( self.job.timeout) - elif (ret["runjob"] == 3): # EXIT_OSERROR in Autodriver + elif ret["runjob"] == 3: # EXIT_OSERROR in Autodriver # Abnormal job termination (Autodriver encountered an OS # error). Assume that the VM is damaged. Destroy this VM # and do not retry the job since the job may have damaged # the VM. msg = "Error: OS error while running job on VM" - (returnVM, replaceVM) = (False, True) + returnVM = False + # doNotDestroy, combined with KEEP_VM_AFTER_FAILURE, will + # set the vm aside for further investigation after failure. + self.job.vm.keepForDebugging = True + self.job.vm.notes = str(self.job.id) + "_" + self.job.name else: # This should never happen msg = "Error: Unknown autodriver error (status=%d)" % ( ret["runjob"]) - elif ret["copyout"] != 0: - msg += "Error: Copy out from VM failed (status=%d)" % ( - ret["copyout"]) - - self.jobQueue.makeDead(self.job.id, msg) - - # Update the text that users see in the autograder output file - self.appendMsg(hdrfile, msg) - self.catFiles(hdrfile, self.job.outputFile) + Config.copyout_errors += 1 + msg += "Error: Copy out from VM failed (status=%d)" % (ret["copyout"]) + else: + msg = "Success: Autodriver returned normally" - # Thread exit after termination - self.detachVM(return_vm=returnVM, replace_vm=replaceVM) - self.notifyServer(self.job) + self.afterJobExecution(hdrfile, msg, returnVM) return # @@ -343,4 +288,4 @@ def run(self): if self.preVM and not vm: vm = self.job.vm = self.preVM if vm: - self.detachVM(return_vm=False, replace_vm=True) + self.detachVM(return_vm=False)