-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path11_labs.py
245 lines (210 loc) · 10.3 KB
/
11_labs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import os
import requests
import boto3
from mutagen.mp3 import MP3
from dotenv import load_dotenv
from openai import OpenAI
import random
import time
import json
# Setup basic configuration for logging
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load environment variables
load_dotenv()
# Constants
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
XI_API_KEY = os.getenv('XI_API_KEY')
client = OpenAI(api_key=OPENAI_API_KEY)
BUCKET_NAME = "cashdaily1"
VOICE_ID = "4dyF1bD2mjNHsB76Wf9q"
if not OPENAI_API_KEY or not XI_API_KEY:
logging.error("API keys are not set in environment variables.")
exit(1)
# Step 1. Ask user for the relative path to the script.txt file and the step to start from
text_file_path = input("Enter the relative path to your script.txt file: ")
if not os.path.isfile(text_file_path):
logging.error(f"The file {text_file_path} does not exist.")
exit(1)
# Step 2. Read text from file
with open(text_file_path, 'r') as file:
text_from_file = file.read()
# Using the new client to create a chat completion
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "system", "content": f"Read the following text and suggest a concise, descriptive folder name with NO special characters and NO spaces:\n{text_from_file}"}]
)
# Print the response object to understand its structure
print(response)
# Extracting the folder name from the response
folder_name = response.choices[0].message.content.strip()
# Define the directory of the text file
text_file_dir = os.path.dirname(text_file_path)
# Step 3: Construct the folder path
folder_path = os.path.join(text_file_dir, folder_name)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
logging.info(f"Folder created: {folder_path}")
# Step 4: Send text to elevenlabs for voice mp3 generation
tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream"
headers = {
"Accept": "application/json",
"xi-api-key": XI_API_KEY
}
data = {
"text": text_from_file, # Use text_from_file instead of text_to_speak
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.8,
"style": 0.5,
"use_speaker_boost": True
}
}
response = requests.post(tts_url, headers=headers, json=data, stream=True)
if response.ok:
logging.info("Text-to-Speech API call successful")
# Save the audio stream to a file
LOCAL_FILE_PATH = os.path.join(folder_path, f"{folder_name}.mp3")
with open(LOCAL_FILE_PATH, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
f.write(chunk)
logging.info(f"Audio stream saved successfully as {LOCAL_FILE_PATH}")
# Step 5: Find length of mp3 file
try:
audio = MP3(LOCAL_FILE_PATH)
total_length = int(audio.info.length)
logging.info(f"Audio length: {total_length} seconds")
# Step 6: Calculate total scenes
total_scenes = round(total_length / 5)
logging.info(f"Total scenes calculated: {total_scenes}")
# Step 7: Upload mp3 file to s3
S3_KEY = f"{folder_name}/{os.path.basename(LOCAL_FILE_PATH)}"
s3_client = boto3.client('s3')
s3_client.upload_file(LOCAL_FILE_PATH, BUCKET_NAME, S3_KEY, ExtraArgs={'ContentType': 'audio/mpeg'})
s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{S3_KEY}"
logging.info(f"File uploaded to S3 at URL: {s3_url}")
# Step 8: Save the URL, audio length, and total scenes to assets.txt
with open(os.path.join(folder_path, 'assets.txt'), 'a') as file:
file.write(f"MP3 URL: {s3_url}\n")
file.write(f"Total Length: {total_length} seconds\n")
file.write(f"Total Scenes: {total_scenes}\n")
logging.info("URL, audio length, and total scenes saved to assets.txt in the script directory")
except Exception as e:
logging.error(f"Error during MP3 handling or S3 upload: {str(e)}")
else:
logging.error(f"Failed to generate speech: {response.text}")
# Begin Image generation process. 1 image per scene
# Define the API URL and headers
post_url = "https://cloud.leonardo.ai/api/rest/v1/generations"
api_key = os.getenv('LEONARDO_API_KEY')
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {api_key}"
}
# Step 9: Improve prompts for images. Define prompt improvement URL
prompt_improve_url = "https://cloud.leonardo.ai/api/rest/v1/prompt/improve"
# Predefined prompts
prompts = [
"a successful young female digital marketer in a luxurious setting (home, yacht, cafe, beach, poolside, or similar), using high-tech devices for her work",
"a successful young female digital marketer in a luxurious setting enjoying her leisure time that success has afforded. Having a fun time with friends or family"
]
# Step 10: Read the total scenes from assets.txt to determine the number of images to generate
assets_file_path = os.path.join(folder_path, 'assets.txt')
with open(assets_file_path, 'r') as file:
lines = file.readlines()
total_scenes_line = next(line for line in lines if "Total Scenes" in line)
num_images = int(total_scenes_line.split(': ')[1].strip()) # Extract the number of scenes
logging.info(f"Number of images to generate: {num_images}")
# Initialize the list to store local image paths
local_image_paths = []
# Now num_images is set dynamically based on the total scenes
print(f"Number of images to generate: {num_images}")
for i in range(num_images):
# Randomly choose between the predefined prompts
chosen_prompt = random.choice(prompts)
print(f"Using original prompt: {chosen_prompt}") # Debugging line to check which prompt is chosen
# Improve the prompt using the API
improve_payload = {"prompt": chosen_prompt}
improve_response = requests.post(prompt_improve_url, json=improve_payload, headers=headers)
# Wait for 5 seconds to ensure the prompt has been improved
time.sleep(5)
if improve_response.status_code == 200:
response_json = improve_response.json()
improved_prompt = response_json.get('promptGeneration', {}).get('prompt', None)
# Debugging line to check the full response from the API
print("API Improve Response:", json.dumps(response_json, indent=4))
# Check if the prompt was actually improved
if improved_prompt is None or improved_prompt == chosen_prompt:
print("No improvement on the prompt. Stopping the process.")
continue # Skip this iteration or use `break` to stop the entire loop
print(f"Improved prompt: {improved_prompt}") # Debugging line to check the improved prompt
else:
print("Failed to improve prompt, using base prompt.") # Debugging line for failure case
print("HTTP Status:", improve_response.status_code, "Response:", improve_response.text) # Additional debugging information
continue # Skip this iteration or use `break` to stop the entire loop
# Payload for the POST request to generate images
payload = {
"height": 1024,
"prompt": improved_prompt, # Ensure this uses the improved prompt
"modelId": "aa77f04e-3eec-4034-9c07-d0f619684628",
"width": 576,
"alchemy": True,
"photoReal": True,
"photoRealVersion": "v2",
"presetStyle": "CINEMATIC",
"num_images": 1
}
# Make the POST request to generate images
post_response = requests.post(post_url, headers=headers, json=payload)
if post_response.status_code == 200:
post_data = post_response.json()
generation_id = post_data['sdGenerationJob']['generationId']
max_attempts = 20
attempts = 0
while attempts < max_attempts:
get_url = f"https://cloud.leonardo.ai/api/rest/v1/generations/{generation_id}"
get_response = requests.get(get_url, headers=headers)
if get_response.status_code == 200:
get_data = get_response.json()
if get_data['generations_by_pk']['status'] == 'COMPLETE':
if get_data['generations_by_pk']['generated_images']:
for img in get_data['generations_by_pk']['generated_images']:
image_url = img['url']
image_response = requests.get(image_url)
if image_response.status_code == 200:
image_filename = image_url.split("/")[-1]
local_image_path = os.path.join(folder_path, image_filename)
with open(local_image_path, 'wb') as f:
f.write(image_response.content)
local_image_paths.append(local_image_path)
print(f"Image downloaded successfully: {image_url}")
break
else:
print("No images generated yet, checking again...")
elif get_data['generations_by_pk']['status'] == 'FAILED':
print("Generation failed.")
break
else:
print("Failed to retrieve generation details, HTTP Status:", get_response.status_code)
time.sleep(15)
attempts += 1
if attempts == max_attempts:
print("Maximum attempts reached, generation may still be processing.")
else:
print("Failed to generate images, HTTP Status:", post_response.status_code, "Response:", post_response.text)
# Step 11: Upload all images to S3 once they are all downloaded
if len(local_image_paths) == num_images:
s3_client = boto3.client('s3')
for local_image_path in local_image_paths:
image_filename = os.path.basename(local_image_path)
s3_key = f"{folder_name}/{image_filename}"
s3_client.upload_file(local_image_path, BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'image/jpeg'})
s3_image_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{s3_key}"
print(f"Image uploaded to S3 at URL: {s3_image_url}")
# Optionally save the S3 URL to assets.txt
with open(os.path.join(folder_path, 'assets.txt'), 'a') as file:
file.write(f"S3 Image URL: {s3_image_url}\n")
else:
print("Error: Not all images were downloaded. Only", len(local_image_paths), "were successfully downloaded.")