diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..af88af7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__ +config.conf* +misc +logs/* +.fuse_hidden* +.vscode +*.log +*.code-workspace diff --git a/ChaturbateRecorder.py b/ChaturbateRecorder.py deleted file mode 100644 index 8f2d299..0000000 --- a/ChaturbateRecorder.py +++ /dev/null @@ -1,228 +0,0 @@ -import time -import datetime -import os -import threading -import sys -import configparser -import streamlink -import subprocess -import queue -import requests - -if os.name == 'nt': - import ctypes - kernel32 = ctypes.windll.kernel32 - kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) - -mainDir = sys.path[0] -Config = configparser.ConfigParser() -setting = {} - -recording = [] - -hilos = [] - -def cls(): - os.system('cls' if os.name == 'nt' else 'clear') - -def readConfig(): - global setting - - Config.read(mainDir + '/config.conf') - setting = { - 'save_directory': Config.get('paths', 'save_directory'), - 'wishlist': Config.get('paths', 'wishlist'), - 'interval': int(Config.get('settings', 'checkInterval')), - 'postProcessingCommand': Config.get('settings', 'postProcessingCommand'), - } - try: - setting['postProcessingThreads'] = int(Config.get('settings', 'postProcessingThreads')) - except ValueError: - if setting['postProcessingCommand'] and not setting['postProcessingThreads']: - setting['postProcessingThreads'] = 1 - - if not os.path.exists(f'{setting["save_directory"]}'): - os.makedirs(f'{setting["save_directory"]}') - -def postProcess(): - while True: - while processingQueue.empty(): - time.sleep(1) - parameters = processingQueue.get() - model = parameters['model'] - path = parameters['path'] - filename = os.path.split(path)[-1] - directory = os.path.dirname(path) - file = os.path.splitext(filename)[0] - subprocess.call(setting['postProcessingCommand'].split() + [path, filename, directory, model, file, 'cam4']) - -class Modelo(threading.Thread): - def __init__(self, modelo): - threading.Thread.__init__(self) - self.modelo = modelo - self._stopevent = threading.Event() - self.file = None - self.online = None - self.lock = threading.Lock() - - def run(self): - global recording, hilos - isOnline = self.isOnline() - if isOnline == False: - self.online = False - else: - self.online = True - self.file = os.path.join(setting['save_directory'], self.modelo, f'{datetime.datetime.fromtimestamp(time.time()).strftime("%Y.%m.%d_%H.%M.%S")}_{self.modelo}.mp4') - try: - session = streamlink.Streamlink() - streams = session.streams(f'hlsvariant://{isOnline}') - stream = streams['best'] - fd = stream.open() - if not isModelInListofObjects(self.modelo, recording): - os.makedirs(os.path.join(setting['save_directory'], self.modelo), exist_ok=True) - with open(self.file, 'wb') as f: - self.lock.acquire() - recording.append(self) - for index, hilo in enumerate(hilos): - if hilo.modelo == self.modelo: - del hilos[index] - break - self.lock.release() - while not (self._stopevent.isSet() or os.fstat(f.fileno()).st_nlink == 0): - try: - data = fd.read(1024) - f.write(data) - except: - fd.close() - break - if setting['postProcessingCommand']: - processingQueue.put({'model': self.modelo, 'path': self.file}) - except Exception as e: - with open('log.log', 'a+') as f: - f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n') - self.stop() - finally: - self.exceptionHandler() - - def exceptionHandler(self): - self.stop() - self.online = False - self.lock.acquire() - for index, hilo in enumerate(recording): - if hilo.modelo == self.modelo: - del recording[index] - break - self.lock.release() - try: - file = os.path.join(os.getcwd(), self.file) - if os.path.isfile(file): - if os.path.getsize(file) <= 1024: - os.remove(file) - except Exception as e: - with open('log.log', 'a+') as f: - f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n') - - def isOnline(self): - try: - resp = requests.get(f'https://chaturbate.com/api/chatvideocontext/{self.modelo}/') - hls_url = '' - if 'hls_source' in resp.json(): - hls_url = resp.json()['hls_source'] - if len(hls_url): - return hls_url - else: - return False - except: - return False - - def stop(self): - self._stopevent.set() - -class CleaningThread(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - self.interval = 0 - self.lock = threading.Lock() - - def run(self): - global hilos, recording - while True: - self.lock.acquire() - new_hilos = [] - for hilo in hilos: - if hilo.is_alive() or hilo.online: - new_hilos.append(hilo) - hilos = new_hilos - self.lock.release() - for i in range(10, 0, -1): - self.interval = i - time.sleep(1) - -class AddModelsThread(threading.Thread): - def __init__(self): - threading.Thread.__init__(self) - self.wanted = [] - self.lock = threading.Lock() - self.repeatedModels = [] - self.counterModel = 0 - - def run(self): - global hilos, recording - lines = open(setting['wishlist'], 'r').read().splitlines() - self.wanted = (x for x in lines if x) - self.lock.acquire() - aux = [] - for model in self.wanted: - model = model.lower() - if model in aux: - self.repeatedModels.append(model) - else: - aux.append(model) - self.counterModel = self.counterModel + 1 - if not isModelInListofObjects(model, hilos) and not isModelInListofObjects(model, recording): - thread = Modelo(model) - thread.start() - hilos.append(thread) - for hilo in recording: - if hilo.modelo not in aux: - hilo.stop() - self.lock.release() - -def isModelInListofObjects(obj, lista): - result = False - for i in lista: - if i.modelo == obj: - result = True - break - return result - -if __name__ == '__main__': - readConfig() - if setting['postProcessingCommand']: - processingQueue = queue.Queue() - postprocessingWorkers = [] - for i in range(0, setting['postProcessingThreads']): - t = threading.Thread(target=postProcess) - postprocessingWorkers.append(t) - t.start() - cleaningThread = CleaningThread() - cleaningThread.start() - while True: - try: - readConfig() - addModelsThread = AddModelsThread() - addModelsThread.start() - i = 1 - for i in range(setting['interval'], 0, -1): - cls() - if len(addModelsThread.repeatedModels): print('The following models are more than once in wanted: [\'' + ', '.join(modelo for modelo in addModelsThread.repeatedModels) + '\']') - print(f'{len(hilos):02d} alive Threads (1 Thread per non-recording model), cleaning dead/not-online Threads in {cleaningThread.interval:02d} seconds, {addModelsThread.counterModel:02d} models in wanted') - print(f'Online Threads (models): {len(recording):02d}') - print('The following models are being recorded:') - for hiloModelo in recording: print(f' Model: {hiloModelo.modelo} --> File: {os.path.basename(hiloModelo.file)}') - print(f'Next check in {i:02d} seconds\r', end='') - time.sleep(1) - addModelsThread.join() - del addModelsThread, i - except: - break diff --git a/README.md b/README.md old mode 100644 new mode 100755 index 736f896..60edbe1 --- a/README.md +++ b/README.md @@ -1,22 +1,19 @@ # ChaturbateRecorder -All credits to @beaston02 and @ahsand97 +All credits to @beaston02, @ahsand97 and @Damianonymous -This is script to automate the recording of public webcam shows from chaturbate.com. +This is a script to automate the recording of public webcam shows from chaturbate.com. - -I have tested this on debian(7+8), ubuntu 14, freenas10 (inside a jail), and Mac OS X (10.10.4), but it should run on other OSs -I do not have a windows machine to test on, but I had another user test it on windows and has reported the 6/21/17 update as working on windows 10 using python3.6.2 (may also work on python3.5+) ## Requirements -Requires python3.5 or newer. You can grab python3.5.2 from https://www.python.org/downloads/release/python-352/ +Requires python3.5 or newer. -to install required modules, run: +to install the required modules, run: ``` -python3.5 -m pip install streamlink bs4 lxml gevent +python3 -m pip install streamlink bs4 lxml gevent ``` -Edit the config file (config.conf) to point to the directory you want to record to, where your "wanted" file is located, which genders, and the interval between checks (in seconds) +Copy the config file `config.conf.dist` to `config.conf` and edit to point to the directory you want to record to, where your "wishlist" file is located, which genders, and the interval between checks (in seconds) -Add models to the "wanted.txt" file (only one model per line). The model should match the models name in their chatrooms URL (https://chaturbate.com/{modelname}/). T clarify this, it should only be the "modelname" portion, not the entire url. +Add models to the "wishlist.txt" file (only one model per line). The model should match the model's name in their chatrooms URL (https://chaturbate.com/{modelname}/). To clarify this, it should only be the "modelname" portion, not the entire url. diff --git a/config.conf b/config.conf.dist old mode 100644 new mode 100755 similarity index 61% rename from config.conf rename to config.conf.dist index 30a0707..b429d07 --- a/config.conf +++ b/config.conf.dist @@ -1,6 +1,6 @@ [paths] -wishlist = ./wanted.txt -save_directory = ./captures +wishlist = wishlist.txt +save_directory = . # set the directory structure - default is "{path}/{model}/{st}_{model}.mp4" # {path} = save_directory set above. (your directory structure should start with this) @@ -11,7 +11,7 @@ save_directory = ./captures # example using a madeup "hannah" who is female: {path}/{gender}/{model}/{year}/{year}.{month}.{day}_{hour}.{minutes}.{seconds}_{model}.mp4 = "/Users/Joe/chaturbate/Female/hannah/2017/{year}.07.26_19.34.47_hannah.mp4" # This will also be the "Download directory" if you set a "completed_directory" -directory_structure = {path}/{model}/{year}.{month}.{day}_{hour}.{minutes}.{seconds}_{model}.mp4 +directory_structure = {path}/{model}/{model}_{year}{month}{day}_{hour}{minutes}{seconds}.mp4 # (OPTIONAL) - leave blank if you dont want files moved after completed @@ -19,11 +19,15 @@ directory_structure = {path}/{model}/{year}.{month}.{day}_{hour}.{minutes}.{seco # if this is left empty, the videos wll remain in the same directory they were originally saved to # This path should be to a directory, not a filename! so do not include the filename portion, only the directory. -completed_directory = +completed_directory = [settings] -checkInterval = 20 +check_online_interval = 300 + +# Specify the maximum duration of recordings in minutes. Shows longer than this are split into separate files + +max_duration_mins = 60 # Specify the genders you would like to monitor to record. Separate multiple genders with a comma # acceptable genders are female, male, trans, and couple @@ -34,25 +38,24 @@ genders = female, couple # You can set a command to be ran on the file once it is completed. This can be any sort of a script you would like. # You can create a script to convert the video via ffmpeg to make it compatible for certain devices, create a contact sheet of the video # upload the video to a cloud storage drive via rclone, or whatever else you see fit. -# set the string to be the same as you you would type into terminal to call the script manually. -# The peramaters which will be passed to the script are as follows: -# 1 = full file path (ie: /Users/Joe/chaturbate/Female/hannah/2017/2017.07.26_19.34.47_hannah.mp4) -# 2 = filename (ie : 2017.07.26_19.34.47_hannah.mp4) -# 3 = directory (ie : /Users/Joe/chaturbate/Female/hannah/2017/) -# 4 = models name (ie: hannah) -# 5 = gender (ie: Female) -# to call a bash script called "MoveToGoogleDrive.sh" and located in the user Joes home directory, you would use: +# Set the string to be the same as you would type into the terminal to call the script manually. +# The variables that can be substituted as part of the script command are as follows: +# {path} = full file path (ie. /Users/Joe/chaturbate/Female/hannah/2017/2017.07.26_19.34.47_hannah.mp4) +# {filename} = filename (ie. 2017.07.26_19.34.47_hannah.mp4) +# {directory} = directory (ie. /Users/Joe/chaturbate/Female/hannah/2017/) +# {file} = The base filename, without extension (ie. 2017.07.26_19.34.47_hannah) +# {model} = models name (ie. hannah) +# For example to re-encode the file with ffmpeg, you can use +# postProcessingCommand = "ffmpeg -i {path} {directory}{file}_encoded.mp4" +# +# To call a bash script called "MoveToGoogleDrive.sh" and located in the user Joes home directory, you would use: # postProcessingCommand = "bash /Users/Joe/home/MoveToGoogleDrive.sh" -# this script will be ran on the files "download location" prior to it being moved to its "completed location". -# The moving of the file will not take place if a post processing script is ran, so if you want to move it once it is completed, do so through commands in the script. +# This script will be ran on the files in the "save_directory" prior to it being moved to its "completed_directory". +# The moving of the files will not take place if a post processing script is ran, so if you want to move it once it is completed, do so through commands in the script. -postProcessingCommand = +postProcessingCommand = # Because depending on what the post processing script does, it may be demanding on the system. # Set the maximum number of concurrent post processing scripts you would like to be ran at one time. -postProcessingThreads = - -[login] -username = -password = +postProcessingThreads = 2 diff --git a/config.py b/config.py new file mode 100755 index 0000000..2c55be1 --- /dev/null +++ b/config.py @@ -0,0 +1,27 @@ +import configparser +import os +import sys + +mainDir = sys.path[0] + +def readConfig(): + config = configparser.ConfigParser() + config.read(mainDir + '/config.conf') + settings = { + 'save_directory': config.get('paths', 'save_directory'), + 'directory_structure': config.get('paths', 'directory_structure').lower(), + 'wishlist': config.get('paths', 'wishlist'), + 'interval': int(config.get('settings', 'check_online_interval')), + 'max_duration': int(config.get('settings', 'max_duration_mins')), + 'postProcessingCommand': config.get('settings', 'postProcessingCommand'), + } + try: + settings['postProcessingThreads'] = int(config.get('settings', 'postProcessingThreads')) + except ValueError: + if settings['postProcessingCommand'] and not settings['postProcessingThreads']: + settings['postProcessingThreads'] = 1 + + if not os.path.exists(f'{settings["save_directory"]}'): + os.makedirs(f'{settings["save_directory"]}') + + return settings diff --git a/ImportFollowed.py b/crawler.py old mode 100644 new mode 100755 similarity index 64% rename from ImportFollowed.py rename to crawler.py index d8e9ef2..5f06f4e --- a/ImportFollowed.py +++ b/crawler.py @@ -1,22 +1,24 @@ -import requests, configparser, sys, pickle, os +import requests, sys, pickle, os from bs4 import BeautifulSoup +from queue import Queue +from threading import Thread +import config +site = 'https://chaturbate.com/' -followed = [] - -Config = configparser.ConfigParser() -Config.read(sys.path[0] + "/config.conf") -wishlist = Config.get('paths', 'wishlist') -username = Config.get('login', 'username') -password = Config.get('login', 'password') +settings = config.readConfig() +wishlist = settings['wishlist'] +username = settings['username'] +password = settings['password'] +followed = [] def login(): s.headers = { 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36', - 'referer': 'https://chaturbate.com/', - 'origin': 'https://chaturbate.com', + 'referer': site, + 'origin': site.rstrip('/'), 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'accept-encoding': 'gzip, deflate, br', 'accept-language': 'en-US,en;q=0.8', @@ -27,11 +29,11 @@ def login(): data = {'username': username, 'password': password, 'next': ''} - result = s.get("https://chaturbate.com/") + result = s.get(site) soup = BeautifulSoup(result.text, "html.parser") data['csrfmiddlewaretoken'] = soup.find('input', {'name': 'csrfmiddlewaretoken'}).get('value') - result = s.post('https://chaturbate.com/auth/login/?next=/', data=data, cookies=result.cookies) + result = s.post(f'{site}auth/login/?next=/', data=data, cookies=result.cookies) if not checkLogin(result): print('login failed - please check your username and password is set correctly in the config file.') exit() @@ -46,11 +48,25 @@ def checkLogin(result): else: return True +def rememberSession(): + with open (sys.path[0] + "/" + username + '.pickle', 'wb') as f: + pickle.dump(s, f) + def getModels(): - print("getting followed models...") + q = Queue() + workers = [] + while not q.empty(): + for i in range(10): + t = Thread(target=getOnlineModels) + workers.append(t) + t.start() + for t in workers: + t.join() + +def getOnlineModels(): page = 1 while True: - result = s.get('https://chaturbate.com/followed-cams/?keywords=&page={}'.format(page)) + result = s.get(f'{site}followed-cams/?keywords=&page={page}') soup = BeautifulSoup(result.text, 'lxml') LIST = soup.findAll('ul', {'class': 'list'})[0] models = LIST.find_all('div', {'class': 'title'}) @@ -70,19 +86,11 @@ def getModels(): s = pickle.load(f) else: s = requests.session() - result = s.get('https://chaturbate.com/') + result = s.get(site) if not checkLogin(result): login() + getModels() print('{} followed models'.format(len(set(followed)))) - f = open(wishlist, 'r') - wanted = list(set(f.readlines())) - wanted = [m.strip('\n').split('chaturbate.com/')[-1].lower().strip().replace('/', '') for m in wanted] - print('{} models currently in the wanted list'.format(len(wanted))) - followed.extend(wanted) - f= open(wishlist, 'w') - for model in set(followed): - f.write(model + '\n') - print('{} models have been added to the wanted list'.format(len(set(followed)) - len(set(wanted)))) - with open (sys.path[0] + "/" +username + '.pickle', 'wb') as f: - pickle.dump(s, f) + + diff --git a/file_modified.py b/file_modified.py new file mode 100755 index 0000000..aeffb68 --- /dev/null +++ b/file_modified.py @@ -0,0 +1,19 @@ +import os +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler, FileModifiedEvent + +class FileModifiedHandler(FileSystemEventHandler): + def __init__(self, file_name, callback): + self.file_name = file_name + self.callback = callback + self.path = os.path.dirname(file_name) + + # Set observer to watch for changes in the directory + self.observer = Observer() + self.observer.schedule(self, self.path, recursive=False) + self.observer.start() + self.observer.join() + + def on_modified(self, event): + if not event.is_directory and event.src_path.endswith(self.file_name): + self.callback() \ No newline at end of file diff --git a/getModels.py b/getModels.py deleted file mode 100644 index 1407be7..0000000 --- a/getModels.py +++ /dev/null @@ -1,68 +0,0 @@ -import gevent, requests, sys, re, configparser -from threading import Thread -from queue import Queue -from bs4 import BeautifulSoup - -Config = configparser.ConfigParser() -Config.read(sys.path[0] + "/config.conf") -genders = re.sub(' ', '', Config.get('settings', 'genders')).split(",") -lastPage = {'female': 100, 'couple': 100, 'trans': 100, 'male': 100} - - -def getOnlineModels(): - global lastPage - global q - global online - if not q.empty(): - args = q.get() - page = args[0] - gender = args[1] - if page < lastPage[gender]: - attempt = 1 - while attempt <= 3: - try: - timeout = gevent.Timeout(8) - timeout.start() - URL = "https://chaturbate.com/{gender}-cams/?page={page}".format(gender=gender.lower(), page=page) - result = requests.request('GET', URL) - result = result.text - soup = BeautifulSoup(result, 'lxml') - if lastPage[gender] == 100: - lastPage[gender] = int(soup.findAll('a', {'class': 'endless_page_link'})[-2].string) - if int(soup.findAll('li', {'class': 'active'})[1].string) == page: - LIST = soup.findAll('ul', {'class': 'list'})[0] - models = LIST.find_all('div', {'class': 'title'}) - for model in models: - online.append(model.find_all('a', href=True)[0].string.lower()[1:]) - break - except gevent.Timeout: - attempt = attempt + 1 - if attempt > 3: - break - - -def getModels(): - workers = [] - for gender in genders: - if gender == 'couple': - for i in range(1, 3): - q.put([i, gender]) - else: - for i in range(1, 30): - q.put([i, gender]) - while not q.empty(): - for i in range(10): - t = Thread(target=getOnlineModels) - workers.append(t) - t.start() - for t in workers: - t.join() - -if __name__ == '__main__': - q = Queue() - online = [] - workers = [] - getModels() - online = list(set(online)) - for model in online: - print(model) diff --git a/log.py b/log.py new file mode 100755 index 0000000..0b21abf --- /dev/null +++ b/log.py @@ -0,0 +1,23 @@ +import logging + +logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s") +rootLogger = logging.getLogger() + +fileHandler = logging.FileHandler("{0}/{1}.log".format('.', 'log')) +fileHandler.setFormatter(logFormatter) +rootLogger.addHandler(fileHandler) + +# consoleHandler = logging.StreamHandler() +# consoleHandler.setFormatter(logFormatter) +# rootLogger.addHandler(consoleHandler) + +logging.FileHandler('logs/log.log', mode='a') + +def log(message): + logging.info(message) + +def error(message): + logging.error(message) + +def exception(e): + logging.exception(e) diff --git a/main.py b/main.py new file mode 100755 index 0000000..c63bae4 --- /dev/null +++ b/main.py @@ -0,0 +1,86 @@ +import time +import os +from tqdm import tqdm +from wakepy import set_keepawake + +from wishlist import Wishlist +from monitor import Monitor +import config +import log + +# Enable ANSI escape sequence processing in Windows +if os.name == 'nt': + import ctypes + kernel32 = ctypes.windll.kernel32 + kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) + +screen_refresh = 1 + +settings = {} +threads = [] + +def cls(): + os.system('cls' if os.name == 'nt' else 'clear') + +if __name__ == '__main__': + cls() + set_keepawake() + + settings = config.readConfig() + try: + wishlist = Wishlist(settings['wishlist']) + wishlist.start() + + pbars = { + 'recorded': tqdm(desc='No recordings yet', bar_format='{desc}'), + 'processing': [], + 'recording': {} + } + + app = Monitor(wishlist, pbars) + app.start() + + i = 1 + if app.postprocess: + for processing_thread in app.postprocess.workers: + processing_pbar = tqdm(desc=f'Processing #{i:02d}', total=100, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}{postfix}') + processing_pbar.display('Waiting') + processing_thread.setPbar(processing_pbar) + pbars['processing'].append(processing_pbar) + i += 1 + + # TODO: Key press to force-refresh models? + + while True: + for i in range(settings['interval'], 0, -screen_refresh): + time.sleep(screen_refresh) + + if len(app.done) > 0: + recorded_models = set(recording['model'] for recording in app.done) + status_message = f"{len(app.done)} recorded models: {', '.join(recorded_models)}" + if app.postprocess: + status_message += f" ({app.postprocess.queue.qsize()} queued for processing)" + pbars['recorded'].set_description(status_message ) + + # Update processing progress bars - done already + # for processing_thread in app.postprocess.workers: + + if len(app.recording_threads) > 0: + for model_thread in app.recording_threads.values(): + file_info = model_thread.info() + duration = file_info['duration'] + file_size = file_info['file_size'] / 1024 / 1024 + duration_in_minutes = duration.total_seconds() / 60 + model_thread.pbar.n = min(duration_in_minutes, model_thread.max_duration) + model_thread.pbar.set_postfix({'size': f'>{file_size:>7.2f}Mb'}) + + except KeyboardInterrupt: + pass + except Exception as e: + log.exception(e) + + # cls() + if len(app.done) > 0: + print(f'Completed {len(app.done)} recordings:') + for done in app.done: + print(f'[{(done["duration"] / 60):3.0f} minutes] {done["resolution"]} {done["path"]} ({done["file_size_mb"]:.2f}Mb at {(done["total_bitrate"] / 1024):.0f}kbps)') diff --git a/model.py b/model.py new file mode 100755 index 0000000..7613db8 --- /dev/null +++ b/model.py @@ -0,0 +1,180 @@ +import datetime +import threading +import os +import time +import streamlink +import requests + +import config +import log + +class Model(threading.Thread): + def __init__(self, model, app): + threading.Thread.__init__(self) + self._stopevent = threading.Event() + self.running = True + + settings = config.readConfig() + + self.model = model + self.hls_cache_file = f'logs/stream_{self.model}.txt' + self.start_time = None + self.app = app + self.file = None + self.directory = settings['save_directory'] + self.max_duration = settings['max_duration'] or 0 + self.online = None + self.hls_source = None + self.pbar = None + + def generateFilename(self): + settings = config.readConfig() + now = datetime.datetime.now() + self.file = settings['directory_structure'].format( + path=self.directory, + model=self.model, + seconds=now.strftime("%S"), + minutes=now.strftime("%M"), + hour=now.strftime("%H"), + day=now.strftime("%d"), + month=now.strftime("%m"), + year=now.strftime("%Y") + ) + + def run(self): + settings = config.readConfig() + + while self.running: + if not self.isOnline(): + # It was recording - stop now + if self.online: + self.stopRecording() + + time.sleep(settings['interval']) + continue + + # Shouldn't happen? Another thread recording the same model? + if self.app.isRecording(self.model): + time.sleep(1) + continue + + # Model is online - start recording + self.startRecording() + + + def info(self): + ret = { + 'model': self.model, + 'online': self.online, + 'start_time': self.start_time, + } + + ret['duration'] = datetime.datetime.now() - self.start_time + ret['file_size'] = os.path.getsize(self.file) + + return ret + + def isOnline(self): + if self.getCachedStream(): + return True + + try: + model_url = f'https://chaturbate.com/api/chatvideocontext/{self.model}/' + resp = requests.get(model_url) + + if resp.headers.get('content-type') != 'application/json': + log.error(f'{self.model} couldn\'t be checked - potential CloudFlare filtering') + return False + + hls_url = '' + if 'hls_source' in resp.json(): + hls_url = resp.json()['hls_source'] + if len(hls_url): + self.hls_source = hls_url + self.cacheStream(hls_url) + + return True + else: + self.hls_source = True + self.clearCache() + return False + except Exception as e: + log.exception(f'EXCEPTION: {e}') + return False + + def startRecording(self): + self.online = True + self.generateFilename() + self._stopevent.clear() + self.start_time = datetime.datetime.now() + + try: + session = streamlink.Streamlink() + streams = session.streams(f'hlsvariant://{self.hls_source}') + stream = streams['best'] + with stream.open() as hls_stream: + os.makedirs(os.path.join(self.directory, self.model), exist_ok=True) + + f = open(self.file, 'wb') + self.app.startRecording(self) + + while not (self._stopevent.isSet() or os.fstat(f.fileno()).st_nlink == 0): + try: + # Break file into 1h chunks + if self.max_duration: + delta = datetime.datetime.now() - self.start_time + minutes = delta.total_seconds() / 60 + if minutes > self.max_duration: + self.app.processRecording(self.model, self.file, self.info()['duration']) + self.start_time = datetime.datetime.now() + self.generateFilename() + f = open(self.file, 'wb') + + data = hls_stream.read(1024) + f.write(data) + except: + hls_stream.close() + self.clearCache() + break + + except Exception as e: + log.exception(f'EXCEPTION: {e}') + finally: + if self.online: + self.stopRecording() + + def cacheStream(self, stream): + with open(self.hls_cache_file, 'w') as f: + f.write(stream) + + def getCachedStream(self): + if os.path.isfile(self.hls_cache_file): + with open(self.hls_cache_file, 'r') as f: + hls_url = f.readline() + if len(hls_url): + self.hls_source = hls_url + return True + + return False + + def clearCache(self): + if os.path.isfile(self.hls_cache_file): + os.remove(self.hls_cache_file) + + def stopRecording(self): + self.online = False + self.app.stopRecording(self) + self.start_time = None + self._stopevent.set() + self.hls_source = None + self.pbar = None + + # If file is too small, delete it + if self.file: + try: + if os.path.isfile(self.file) and os.path.getsize(self.file) <= 1024: + os.remove(self.file) + except Exception as e: + log.exception(f'EXCEPTION: {e}') + + self.file = None diff --git a/monitor.py b/monitor.py new file mode 100755 index 0000000..5f7cf6f --- /dev/null +++ b/monitor.py @@ -0,0 +1,149 @@ +import threading +import time +import os +import ffmpeg +from tqdm import tqdm + +from model import Model +from postprocessing import PostProcessing +import config + + +settings = config.readConfig() + +class Monitor(threading.Thread): + def __init__(self, wishlist, pbars): + threading.Thread.__init__(self, daemon=True) + self.lock = threading.Lock() + + self.wishlist = wishlist + self.pbars = pbars + self.monitoring_threads = {} + self.recording_threads = {} + self.done = [] + + self.postprocess = None + if settings['postProcessingCommand']: + self.postprocess = PostProcessing(settings['postProcessingCommand'], settings['postProcessingThreads'] or 2) + + def isHandled(self, model): + return self.isMonitored(model) or self.isRecording(model) + + def isMonitored(self, model): + return model in (model for model in self.monitoring_threads.keys()) + + def isRecording(self, model): + return model in (model for model in self.recording_threads.keys()) + + def startRecording(self, modelThread): + self.lock.acquire() + self.recording_threads[modelThread.model] = modelThread + del self.monitoring_threads[modelThread.model] + self.lock.release() + + self.attachPbar(modelThread) + + def attachPbar(self, modelThread): + modelPbar = tqdm( + desc=f'Recording {modelThread.model:32s}', + total=modelThread.max_duration, + bar_format='{desc} |{bar}| {elapsed}{postfix}', + leave=False + ) + self.pbars['recording'][modelThread.model] = modelPbar + modelThread.pbar = modelPbar + + def stopRecording(self, modelThread): + self.lock.acquire() + self.monitoring_threads[modelThread.model] = modelThread + if modelThread.model in self.recording_threads: + del self.recording_threads[modelThread.model] + + self.lock.release() + + if modelThread.model in self.pbars['recording']: + self.pbars['recording'][modelThread.model].clear() + self.pbars['recording'][modelThread.model].close() + del self.pbars['recording'][modelThread.model] + + if os.path.isfile(modelThread.file) and os.path.getsize(modelThread.file) > 1024: + self.processRecording(modelThread.model, modelThread.file) + + def processRecording(self, model, file): + if self.postprocess: + self.postprocess.add({'model': model, 'path': file}) + + probe = ffmpeg.probe(file) + file_size_mb = os.path.getsize(file) / 1024 / 1024 + video = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) + audio = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None) + recording = { + 'model': model, + 'path': file, + 'file_size': os.path.getsize(file), + 'file_size_mb': file_size_mb, + 'duration': float(probe['format']['duration']), + 'width': int(video['width']), + 'height': int(video['height']), + 'resolution': f"{video['width']}x{video['height']}", + 'video_bitrate': int(video['bit_rate']) if 'bit_rate' in video else (file_size_mb / (float(probe['format']['duration']) * .45)), + 'video_codec': f"{video['codec_name']} - {video['codec_long_name']}", + 'frame_rate': video['r_frame_rate'], + 'audio_codec': f"{audio['codec_name']} - {audio['codec_long_name']}", + 'audio_bitrate': int(audio['bit_rate']), + 'format': f"{probe['format']['format_name']} - {probe['format']['format_long_name']}", + 'total_bitrate': int(probe['format']['bit_rate']), + } + + self.done.append(recording) + + def stopMonitoring(self, model): + self.lock.acquire() + self.monitoring_threads[model].running = False + self.monitoring_threads[model].join() + del self.monitoring_threads[model] + self.lock.release() + + def startMonitoring(self, model): + thread = Model(model, self) + thread.daemon = True + thread.start() + + self.lock.acquire() + self.monitoring_threads[model] = thread + self.lock.release() + + def cleanThreads(self): + self.lock.acquire() + models = list(self.recording_threads.keys()) + self.lock.release() + + for model in models: + if model not in self.wishlist.wishlist: + self.recording_threads[model].stopRecording() + self.stopMonitoring(model) + + # models = self.recording_threads.keys() + # for model in models: + # if not self.recording_threads[model].is_alive(): + # self.recording_threads[model].stopRecording() + + # models = self.monitoring_threads.keys() + # for model in models: + # if not self.monitoring_threads[model].is_alive(): + # self.stopMonitoring(model) + + + def loop(self): + # Kill off stopped threads (will be recreated in the next step if needed) + self.cleanThreads() + + # Start a thread for each model in our wishlist + for model in self.wishlist.wishlist: + if not self.isHandled(model): + self.startMonitoring(model) + + def run(self): + while True: + self.loop() + time.sleep(1) diff --git a/postprocessing.py b/postprocessing.py new file mode 100755 index 0000000..4038644 --- /dev/null +++ b/postprocessing.py @@ -0,0 +1,25 @@ +import queue + +from processing_worker import ProcessingWorker + +class PostProcessing(): + def __init__(self, cmd, thread_count): + if not cmd: + return + + # TODO: Handle ffmpeg encoding within the app (with processing progress bar) + self.cmd = cmd + self.thread_count = thread_count + self.queue = queue.Queue() + self.workers = [] + + self.start() + + def start(self): + for i in range(0, self.thread_count): + thread = ProcessingWorker(self.queue, self.cmd) + self.workers.append(thread) + thread.start() + + def add(self, item): + self.queue.put(item) diff --git a/processing_worker.py b/processing_worker.py new file mode 100755 index 0000000..977c4ac --- /dev/null +++ b/processing_worker.py @@ -0,0 +1,54 @@ +import subprocess +import time +import threading +import os +import json +from tqdm import tqdm +from mmisp import process + +class ProcessingWorker(threading.Thread): + def __init__(self, queue, cmd): + super().__init__() + self.daemon = True + + self.cmd = cmd + self.queue = queue + self.sleep = 1 # 60 + self.current_file = None + self.pbar = None + + def setPbar(self, pbar: tqdm): + self.pbar = pbar + + def update_progress(self, percent_complete): + if isinstance(self.pbar, tqdm): + self.pbar.n = percent_complete + self.pbar.refresh() + + def run(self): + while True: + while self.queue.empty(): + time.sleep(self.sleep) + + parameters = self.queue.get() + self.current_file = parameters['path'] + + if self.cmd == 'mmisp': + with open('process.json', 'r') as f: + process.run(parameters['path'], + json.load(f), + progress_callback=self.update_progress) + else: + replacements = { + 'path': parameters['path'], + 'filename': os.path.split(parameters['path'])[-1], + 'directory': os.path.dirname(parameters['path']), + 'file': os.path.splitext(os.path.split(parameters['path'])[-1])[0], + 'model': parameters['model'] + } + cmd = self.cmd.format(replacements) + subprocess.call(cmd.split()) + + self.pbar.clear() + self.pbar.display('Waiting') + self.current_file = None diff --git a/wanted.txt b/wanted.txt old mode 100644 new mode 100755 index c94b214..5a59abd --- a/wanted.txt +++ b/wanted.txt @@ -1,6 +1 @@ -chanellls -chroniclove -cum4myass - - - +kloeking_ diff --git a/wishlist.py b/wishlist.py new file mode 100755 index 0000000..fb25e0c --- /dev/null +++ b/wishlist.py @@ -0,0 +1,36 @@ +import threading + +import config +import log +from file_modified import FileModifiedHandler + +settings = config.readConfig() + +class Wishlist(threading.Thread): + def __init__(self, wishlist_file): + threading.Thread.__init__(self, daemon=True) + self.wishlist = [] + self.wishlist_file = wishlist_file + + def reload(self): + lines = [] + with open(self.wishlist_file, 'r') as f: + lines = f.read().splitlines() + + new_list = [] + for model in lines: + model = model.strip().lower() + if not model or ' ' in model: + continue + if model in new_list: + log.error(f'The {model} model is listed more than once in the wishlist') + continue + + new_list.append(model) + + self.wishlist = new_list + + def run(self): + self.reload() + + FileModifiedHandler(self.wishlist_file, self.reload) diff --git a/ws.py b/ws.py new file mode 100755 index 0000000..df60009 --- /dev/null +++ b/ws.py @@ -0,0 +1,23 @@ +import websocket +import rel + +import config + +settings = config.readConfig() + +def on_message(ws, message): + print(message) + +try: + ws = websocket.WebSocketApp( + f"wss://realtime.pa.highwebmedia.com/?access_token={settings['access_token']}&format=json&heartbeats=true&v=2&agent=ably-js%2F1.2.37%20browser&remainPresentFor=0", + on_message=on_message, + ) + + # Set dispatcher to automatic reconnection, 5 second reconnect delay if connection closed unexpectedly + ws.run_forever(dispatcher=rel, reconnect=5) + # Keyboard Interrupt + rel.signal(2, rel.abort) + rel.dispatch() +except Exception as e: + print(e)