Skip to content

Distribute chunks over slaves #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 95 additions & 44 deletions prt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import urllib
import urllib2
import uuid
import Queue

from distutils.spawn import find_executable

Expand Down Expand Up @@ -90,6 +91,8 @@ def colored(msg, *args):
NEW_TRANSCODER_NAME = "plex_transcoder"
ORIGINAL_TRANSCODER_NAME = "Plex Transcoder"

SEGMENTS_PER_NODE = 5

REMOTE_ARGS = ("%(env)s;"
"cd %(working_dir)s;"
"%(command)s %(args)s")
Expand Down Expand Up @@ -128,6 +131,7 @@ def printf(message, *args, **kwargs):
sys.stdout.write(colored(message % args, color, attrs=attrs))
sys.stdout.flush()


def get_auth_token():
url = "https://plex.tv/users/sign_in.json"
payload = urllib.urlencode({
Expand Down Expand Up @@ -248,6 +252,7 @@ def overwrite_transcoder_after_upgrade():
print "Transcoder hasn't been previously installed, please use install option"
sys.exit(1)


def build_env(host=None):
# TODO: This really should be done in a way that is specific to the target
# in the case that the target is a different architecture than the host
Expand Down Expand Up @@ -303,6 +308,29 @@ def transcode_local():
if output and is_debug:
log.debug(output.strip('\n'))


def get_available_remote_servers():
config = get_config()
servers = config["servers"]
available_servers = {}

for hostname, host in servers.items():
log.debug("Getting load for host '%s'" % hostname)
host["load"] = get_system_load_remote(hostname, host["port"], host["user"])

if not host["load"]:
# If no load is returned, then it is likely that the host
# is offline or unreachable
log.debug("Couldn't get load for host '%s'" % hostname)
continue

available_servers[hostname] = host

log.debug("Available servers : %s\n" % available_servers)

return available_servers


def transcode_remote():
setup_logging()

Expand Down Expand Up @@ -356,53 +384,74 @@ def transcode_remote():
except Exception, e:
log.error("Error retreiving host list via '%s': %s" % (config["servers_script"], str(e)))

hostname, host = None, None

# Let's try to load-balance
min_load = None
for hostname, host in servers.items():

log.debug("Getting load for host '%s'" % hostname)
load = get_system_load_remote(hostname, host["port"], host["user"])

if not load:
# If no load is returned, then it is likely that the host
# is offline or unreachable
log.debug("Couldn't get load for host '%s'" % hostname)
command = command.replace("127.0.0.1", config["ipaddress"]).split(' ')
segment_time = int(command[command.index("-segment_time") + 1])
ss = int(command[command.index("-ss") + 1])
start_segment = int(command[command.index("-segment_start_number") + 1])
q = Queue.Queue()
init = True
just_finished = False
transcoding_servers = []
consecutive_errors = 0
log.info("Initializing distributed trancode %s" % command)
log.info("Segment time: %s" % segment_time)

while (consecutive_errors < 5) and (q.empty() is False or init is True or just_finished is True):
log.info("Fetching available servers")
available_servers = get_available_remote_servers()
just_finished = False

for hostname, host in available_servers.items():
log.info("Checking server %s" % hostname)
if hostname in transcoding_servers:
log.info("Server already transcoding a segment")
continue

log.info("Starting trancoder with segment %s" % str(start_segment))
proc = process_segment(host, hostname, start_segment, segment_time, ss, command)
transcoding_servers.append(hostname)
start_segment+=SEGMENTS_PER_NODE
ss+=segment_time*SEGMENTS_PER_NODE
q.put((proc, hostname))

if init:
log.info("Distributed transcode initialized")
init = False
continue

log.debug("Log for '%s': %s" % (hostname, str(load)))
proc, hostname = q.get()
log.info("Checking if %s finished transcode" % hostname)
code = proc.poll()

# XXX: Use more that just 1-minute load?
if min_load is None or min_load[1] > load[0]:
min_load = (hostname, load[0],)

if min_load is None:
log.info("No hosts found...using local")
return transcode_local()
if code is None:
q.put((proc, hostname))
else:
log.info("%s finished transcode" % hostname)
just_finished = True
if code == 1:
consecutive_errors += 1
log.info("Transcode returned an error (%s)" % str(consecutive_errors))
else:
consecutive_errors = 0

# Select lowest-load host
log.info("Host with minimum load is '%s'" % min_load[0])
hostname, host = min_load[0], servers[min_load[0]]
transcoding_servers.remove(hostname)

log.info("Using transcode host '%s'" % hostname)
time.sleep(.300)

# Remap the 127.0.0.1 reference to the proper address
command = command.replace("127.0.0.1", config["ipaddress"])
log.info("Transcode finished")

#
# TODO: Remap file-path to PMS URLs
#

args = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + [command]
def process_segment (host, hostname, segment, time, ss, command):
command = ["ssh", "%s@%s" % (host["user"], hostname), "-p", host["port"]] + command
cmd = []

log.info("Launching transcode_remote with args %s\n" % args)
command[command.index("-segment_start_number") + 1] = str(segment)
command[command.index("-ss") + 1] = str(ss)
command.insert(command.index("-i"), "-t")
command.insert(command.index("-i"), str(int(command[command.index("-segment_time") + 1]) * SEGMENTS_PER_NODE))

# Spawn the process
proc = subprocess.Popen(args)
proc.wait()

log.info("Transcode stopped on host '%s'" % hostname)
return subprocess.Popen(command)


def re_get(regex, string, group=0, default=None):
Expand All @@ -415,6 +464,7 @@ def re_get(regex, string, group=0, default=None):
return match.groups()
return default


def et_get(node, attrib, default=None):
if node is not None:
return node.attrib.get(attrib, default)
Expand All @@ -437,6 +487,7 @@ def get_plex_sessions(auth_token=None):
}
return sessions


def get_sessions():
sessions = {}

Expand Down Expand Up @@ -487,6 +538,7 @@ def get_sessions():
sessions[m.groups()[0]] = data
return sessions


def check_config():
"""
Run through various diagnostic checks to see if things are configured
Expand Down Expand Up @@ -597,13 +649,13 @@ def usage():
print "Usage:\n"
print " %s [options]\n" % os.path.basename(sys.argv[0])
print (
"Options:\n\n"
" usage, help, -h, ? Show usage page\n"
" get_load Show the load of the system\n"
" get_cluster_load Show the load of all systems in the cluster\n"
" install Install PRT for the first time and then sets up configuration\n"
" overwrite Fix PRT after PMS has had a version update breaking PRT\n"
" add_host Add an extra host to the list of slaves PRT is to use\n"
"Options:\n\n"
" usage, help, -h, ? Show usage page\n"
" get_load Show the load of the system\n"
" get_cluster_load Show the load of all systems in the cluster\n"
" install Install PRT for the first time and then sets up configuration\n"
" overwrite Fix PRT after PMS has had a version update breaking PRT\n"
" add_host Add an extra host to the list of slaves PRT is to use\n"
" remove_host Removes a host from the list of slaves PRT is to use\n"
" sessions Display current sessions\n"
" check_config Checks the current configuration for errors\n")
Expand Down Expand Up @@ -707,4 +759,3 @@ def main():
else:
usage()
sys.exit(-1)