|
| 1 | +import os |
| 2 | +import openai |
| 3 | +import requests |
| 4 | +import boto3 |
| 5 | +from mutagen.mp3 import MP3 |
| 6 | +from dotenv import load_dotenv |
| 7 | + |
| 8 | +# Setup basic configuration for logging |
| 9 | +import logging |
| 10 | +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| 11 | + |
| 12 | +# Load environment variables |
| 13 | +load_dotenv() |
| 14 | + |
| 15 | +# Constants |
| 16 | +OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') |
| 17 | +XI_API_KEY = os.getenv('XI_API_KEY') |
| 18 | +BUCKET_NAME = "cashdaily1" |
| 19 | +VOICE_ID = "4dyF1bD2mjNHsB76Wf9q" |
| 20 | + |
| 21 | +if not OPENAI_API_KEY or not XI_API_KEY: |
| 22 | + logging.error("API keys are not set in environment variables.") |
| 23 | + exit(1) |
| 24 | + |
| 25 | +# Initialize OpenAI client |
| 26 | +openai.api_key = OPENAI_API_KEY |
| 27 | + |
| 28 | +# Step 1. Ask user for the relative path to the script.txt file and the step to start from |
| 29 | +text_file_path = input("Enter the relative path to your script.txt file: ") |
| 30 | +start_step = int(input("Enter the step number to start from (1-10): ")) |
| 31 | + |
| 32 | +if not os.path.isfile(text_file_path): |
| 33 | + logging.error(f"The file {text_file_path} does not exist.") |
| 34 | + exit(1) |
| 35 | + |
| 36 | +text_file_dir = os.path.dirname(os.path.abspath(text_file_path)) |
| 37 | + |
| 38 | +# Step 2. Read text from file |
| 39 | +if start_step <= 2: |
| 40 | + try: |
| 41 | + with open(text_file_path, 'r') as file: |
| 42 | + text_to_speak = file.read() |
| 43 | + logging.info("Text read successfully from script.txt") |
| 44 | + except Exception as e: |
| 45 | + logging.error(f"Failed to read text file: {str(e)}") |
| 46 | + exit(1) |
| 47 | + |
| 48 | +# Step 3. Generate a folder name using OpenAI |
| 49 | +if start_step <= 3: |
| 50 | + try: |
| 51 | + response = openai.Completion.create( |
| 52 | + model="gpt-3.5-turbo", |
| 53 | + prompt=f"Generate a concise, descriptive name for a folder based on the following text: {text_to_speak}", |
| 54 | + max_tokens=10, |
| 55 | + temperature=0.7 |
| 56 | + ) |
| 57 | + folder_name = response.choices[0].text.strip() |
| 58 | + folder_path = os.path.join(text_file_dir, folder_name) |
| 59 | + if not os.path.exists(folder_path): |
| 60 | + os.makedirs(folder_path) |
| 61 | + logging.info(f"Folder created: {folder_path}") |
| 62 | + except Exception as e: |
| 63 | + logging.error(f"OpenAI API error or folder creation error: {str(e)}") |
| 64 | + exit(1) |
| 65 | + |
| 66 | + |
| 67 | + |
| 68 | +# Step 4: Send text to elevenlabs for voice mp3 generation |
| 69 | +if start_step <= 6: |
| 70 | + tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" |
| 71 | + headers = { |
| 72 | + "Accept": "application/json", |
| 73 | + "xi-api-key": XI_API_KEY |
| 74 | + } |
| 75 | + data = { |
| 76 | + "text": text_to_speak, |
| 77 | + "model_id": "eleven_multilingual_v2", |
| 78 | + "voice_settings": { |
| 79 | + "stability": 0.5, |
| 80 | + "similarity_boost": 0.8, |
| 81 | + "style": 0.5, |
| 82 | + "use_speaker_boost": True |
| 83 | + } |
| 84 | + } |
| 85 | + response = requests.post(tts_url, headers=headers, json=data, stream=True) |
| 86 | + if response.ok: |
| 87 | + logging.info("Text-to-Speech API call successful") |
| 88 | + # Save the audio stream to a file |
| 89 | + LOCAL_FILE_PATH = os.path.join(folder_path, f"{folder_name}.mp3") |
| 90 | + with open(LOCAL_FILE_PATH, "wb") as f: |
| 91 | + for chunk in response.iter_content(chunk_size=1024): |
| 92 | + f.write(chunk) |
| 93 | + logging.info(f"Audio stream saved successfully as {LOCAL_FILE_PATH}") |
| 94 | + |
| 95 | + # Step 4: Find length of mp3 file |
| 96 | + try: |
| 97 | + audio = MP3(LOCAL_FILE_PATH) |
| 98 | + total_length = int(audio.info.length) |
| 99 | + logging.info(f"Audio length: {total_length} seconds") |
| 100 | + |
| 101 | + # Step 6: Calculate total scenes |
| 102 | + total_scenes = round(total_length / 5) |
| 103 | + logging.info(f"Total scenes calculated: {total_scenes}") |
| 104 | + |
| 105 | + # Step 7: Upload mp3 file to s3 |
| 106 | + S3_KEY = f"{folder_name}/{os.path.basename(LOCAL_FILE_PATH)}" |
| 107 | + s3_client = boto3.client('s3') |
| 108 | + s3_client.upload_file(LOCAL_FILE_PATH, BUCKET_NAME, S3_KEY) |
| 109 | + s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{S3_KEY}" |
| 110 | + logging.info(f"File uploaded to S3 at URL: {s3_url}") |
| 111 | + |
| 112 | + # Step 8: Save the URL, audio length, and total scenes to assets.txt |
| 113 | + with open(os.path.join(folder_path, 'assets.txt'), 'a') as file: |
| 114 | + file.write(f"MP3 URL: {s3_url}\n") |
| 115 | + file.write(f"Total Length: {total_length} seconds\n") |
| 116 | + file.write(f"Total Scenes: {total_scenes}\n") |
| 117 | + logging.info("URL, audio length, and total scenes saved to assets.txt in the script directory") |
| 118 | + except Exception as e: |
| 119 | + logging.error(f"Error during MP3 handling or S3 upload: {str(e)}") |
| 120 | + else: |
| 121 | + logging.error(f"Failed to generate speech: {response.text}") |
| 122 | + |
| 123 | +# Begin Image generation process. 1 image per scene |
| 124 | +# Define the API URL and headers |
| 125 | +post_url = "https://cloud.leonardo.ai/api/rest/v1/generations" |
| 126 | +api_key = os.getenv('LEONARDO_API_KEY') |
| 127 | +headers = { |
| 128 | + "accept": "application/json", |
| 129 | + "content-type": "application/json", |
| 130 | + "authorization": f"Bearer {api_key}" |
| 131 | +} |
| 132 | + |
| 133 | +# Step 9: Improve prompts for images. Define prompt improvement URL |
| 134 | +prompt_improve_url = "https://cloud.leonardo.ai/api/rest/v1/prompt/improve" |
| 135 | + |
| 136 | +# Predefined prompts |
| 137 | +prompts = [ |
| 138 | + "a successful young female digital marketer in a luxurious setting (home, yacht, cafe, beach, poolside, or similar), using high-tech devices for her work", |
| 139 | + "a successful young female digital marketer in a luxurious setting enjoying her leisure time that success has afforded. Having a fun time with friends or family" |
| 140 | +] |
| 141 | + |
| 142 | +# Step 10: Read the total scenes from assets.txt to determine the number of images to generate |
| 143 | +assets_file_path = os.path.join(folder_path, 'assets.txt') |
| 144 | +try: |
| 145 | + with open(assets_file_path, 'r') as file: |
| 146 | + lines = file.readlines() |
| 147 | + total_scenes_line = next(line for line in lines if "Total Scenes" in line) |
| 148 | + num_images = int(total_scenes_line.split(': ')[1].strip()) # Extract the number of scenes |
| 149 | +except Exception as e: |
| 150 | + logging.error(f"Failed to read or parse assets.txt: {str(e)}") |
| 151 | + exit(1) |
| 152 | + |
| 153 | +# Now num_images is set dynamically based on the total scenes |
| 154 | +print(f"Number of images to generate: {num_images}") |
| 155 | + |
| 156 | + |
| 157 | +for i in range(num_images): |
| 158 | + # Randomly choose between the predefined prompts |
| 159 | + chosen_prompt = random.choice(prompts) |
| 160 | + print(f"Using original prompt: {chosen_prompt}") # Debugging line to check which prompt is chosen |
| 161 | + |
| 162 | + # Improve the prompt using the API |
| 163 | + improve_payload = {"prompt": chosen_prompt} |
| 164 | + improve_response = requests.post(prompt_improve_url, json=improve_payload, headers=headers) |
| 165 | + |
| 166 | + # Wait for 5 seconds to ensure the prompt has been improved |
| 167 | + time.sleep(5) |
| 168 | + |
| 169 | + if improve_response.status_code == 200: |
| 170 | + response_json = improve_response.json() |
| 171 | + improved_prompt = response_json.get('promptGeneration', {}).get('prompt', None) |
| 172 | + |
| 173 | + # Debugging line to check the full response from the API |
| 174 | + print("API Improve Response:", json.dumps(response_json, indent=4)) |
| 175 | + |
| 176 | + # Check if the prompt was actually improved |
| 177 | + if improved_prompt is None or improved_prompt == chosen_prompt: |
| 178 | + print("No improvement on the prompt. Stopping the process.") |
| 179 | + continue # Skip this iteration or use `break` to stop the entire loop |
| 180 | + |
| 181 | + print(f"Improved prompt: {improved_prompt}") # Debugging line to check the improved prompt |
| 182 | + else: |
| 183 | + print("Failed to improve prompt, using base prompt.") # Debugging line for failure case |
| 184 | + print("HTTP Status:", improve_response.status_code, "Response:", improve_response.text) # Additional debugging information |
| 185 | + continue # Skip this iteration or use `break` to stop the entire loop |
| 186 | + |
| 187 | + # Payload for the POST request to generate images |
| 188 | + payload = { |
| 189 | + "height": 1024, |
| 190 | + "prompt": improved_prompt, # Ensure this uses the improved prompt |
| 191 | + "modelId": "aa77f04e-3eec-4034-9c07-d0f619684628", |
| 192 | + "width": 576, |
| 193 | + "alchemy": True, |
| 194 | + "photoReal": True, |
| 195 | + "photoRealVersion": "v2", |
| 196 | + "presetStyle": "CINEMATIC", |
| 197 | + "num_images": 1 |
| 198 | + } |
| 199 | + |
| 200 | + # Make the POST request to generate images |
| 201 | + post_response = requests.post(post_url, headers=headers, json=payload) |
| 202 | + if post_response.status_code == 200: |
| 203 | + post_data = post_response.json() |
| 204 | + print("API Response:", json.dumps(post_data, indent=4)) |
| 205 | + generation_id = post_data['sdGenerationJob']['generationId'] |
| 206 | + max_attempts = 20 |
| 207 | + attempts = 0 |
| 208 | + while attempts < max_attempts: |
| 209 | + get_url = f"https://cloud.leonardo.ai/api/rest/v1/generations/{generation_id}" |
| 210 | + get_response = requests.get(get_url, headers=headers) |
| 211 | + if get_response.status_code == 200: |
| 212 | + get_data = get_response.json() |
| 213 | + if get_data['generations_by_pk']['status'] == 'COMPLETE': |
| 214 | + if get_data['generations_by_pk']['generated_images']: |
| 215 | + # Use the folder_path from earlier in the script |
| 216 | + if not os.path.exists(folder_path): |
| 217 | + os.makedirs(folder_path) |
| 218 | + for img in get_data['generations_by_pk']['generated_images']: |
| 219 | + image_url = img['url'] |
| 220 | + image_response = requests.get(image_url) |
| 221 | + if image_response.status_code == 200: |
| 222 | + image_filename = image_url.split("/")[-1] |
| 223 | + with open(f'{folder_path}/{image_filename}', 'wb') as f: |
| 224 | + f.write(image_response.content) |
| 225 | + print(f"Image downloaded successfully: {image_url}") |
| 226 | + break |
| 227 | + else: |
| 228 | + print("No images generated yet, checking again...") |
| 229 | + elif get_data['generations_by_pk']['status'] == 'FAILED': |
| 230 | + print("Generation failed.") |
| 231 | + break |
| 232 | + else: |
| 233 | + print("Failed to retrieve generation details, HTTP Status:", get_response.status_code) |
| 234 | + time.sleep(15) |
| 235 | + attempts += 1 |
| 236 | + if attempts == max_attempts: |
| 237 | + print("Maximum attempts reached, generation may still be processing.") |
| 238 | + else: |
| 239 | + print("Failed to generate images, HTTP Status:", post_response.status_code, "Response:", post_response.text) |
| 240 | + |
| 241 | + # Step 11: Upload all images to S3 once they are all downloaded |
| 242 | + for local_image_path in local_image_paths: |
| 243 | + image_filename = os.path.basename(local_image_path) |
| 244 | + s3_key = f"{folder_name}/{image_filename}" |
| 245 | + s3_client.upload_file(local_image_path, BUCKET_NAME, s3_key) |
| 246 | + s3_image_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{s3_key}" |
| 247 | + print(f"Image uploaded to S3 at URL: {s3_image_url}") |
| 248 | + |
| 249 | + # Optionally save the S3 URL to assets.txt |
| 250 | + with open(os.path.join(folder_path, 'assets.txt'), 'a') as file: |
| 251 | + file.write(f"S3 Image URL: {s3_image_url}\n") |
0 commit comments