Skip to content

token cleanup, preflight check of files #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 129 additions & 70 deletions downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,40 @@ def __init__(self, config, utils):
)
self.set_metadata = self.config.get_config_value("downloading", "set_metadata")

def _get_track_metadata(self, url):
"""
Fetches track metadata from Spotify API.
"""
track_id_str = self.utils.get_id_type_from_url(url)[0]
headers = {"Authorization": f"Bearer {self.utils.get_token()}"}
resp = requests.get(
f"https://api.spotify.com/v1/tracks/{track_id_str}",
headers=headers,
timeout=10,
)
resp.raise_for_status() # Raise an exception for bad status codes
return resp.json()

def _get_path_from_metadata(self, metadata):
"""
Constructs the final file path from metadata.
"""
artist = metadata["artists"][0]["name"]
track_title = metadata["name"]
album_name = metadata["album"]["name"]
album_release = metadata["album"]["release_date"]
track_number = metadata["track_number"]

filename_format = self.config.get_config_value("downloading", "track_format")
filename = filename_format.format(
artist=artist,
title=track_title,
album=album_name,
tracknumber=track_number,
year=album_release,
)
return f"{self.download_path}/{filename}.ogg"

def get_track_urls(self, link):
"""
get all tracks available in a playlist or album (spotify gives max 50 entries)
Expand Down Expand Up @@ -83,21 +117,43 @@ def download_playlist_or_album(self, link):
download songs off an album or playlist
"""

tracks = self.get_track_urls(link)
total_tracks = len(tracks)
all_track_urls = self.get_track_urls(link)
if not all_track_urls:
print("[download_playlist_or_album] No tracks found to download.")
return

for count, track in enumerate(tracks):
self.download_track(track)
print("[download_playlist_or_album] Checking for tracks to download...")

tracks_to_download = []
for url in tqdm.tqdm(all_track_urls, desc="Checking library"):
try:
metadata = self._get_track_metadata(url)
path = self._get_path_from_metadata(metadata)

if os.path.exists(path):
tqdm.tqdm.write(
f"[download_playlist_or_album] '{metadata['name']}' already exists, skipping."
)
else:
tracks_to_download.append((url, metadata))
except requests.exceptions.RequestException as e:
tqdm.tqdm.write(
f"[download_playlist_or_album] Failed to get metadata for a track, skipping. Error: {e}"
)

total_to_download = len(tracks_to_download)
print(f"[download_playlist_or_album] Found {total_to_download} new track(s) to download.")

for count, (track_url, track_metadata) in enumerate(tracks_to_download):
self.download_track(track_url, track_metadata)
print(
f"[download_playlist_or_album] Progress: {count + 1}/{total_tracks}\n"
f"[download_playlist_or_album] Progress: {count + 1}/{total_to_download}\n"
)

def download_track(self, url):
def download_track(self, url, metadata):
"""
download a track
download a track, metadata must be pre-fetched
"""

try:
timeout = self.config.get_config_value("downloading", "timeout")

Expand All @@ -110,91 +166,94 @@ def download_track(self, url):
'[download_track] "timeout" from config file must be a number (without quotes).'
)

track_id = TrackId.from_uri(
f"spotify:track:{self.utils.get_id_type_from_url(url)[0]}"
)
headers = {"Authorization": f"Bearer {self.utils.get_token()}"}

resp = requests.get(
f"https://api.spotify.com/v1/tracks/{self.utils.get_id_type_from_url(url)[0]}",
headers=headers,
timeout=10,
).json()

resp = metadata
artist = resp["artists"][0]["name"] # artist
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn’t it be better to replace resp with metadata?

track_title = resp["name"] # title
album_name = resp["album"]["name"] # album
album_release = resp["album"]["release_date"] # date
track_number = resp["track_number"] # tracknumber
cover_image = resp["album"]["images"][0] # coverart, width, height

if self.premium_downloads:
stream = self.session.content_feeder().load(
track_id, VorbisOnlyAudioQuality(AudioQuality.VERY_HIGH), False, None
)
quality = AudioQuality.VERY_HIGH if self.premium_downloads else AudioQuality.HIGH
quality_str = "VERY_HIGH" if self.premium_downloads else "HIGH"

else:
stream = self.session.content_feeder().load(
track_id, VorbisOnlyAudioQuality(AudioQuality.HIGH), False, None
)
print(f"[download_track] Requesting stream for '{track_title}' in {quality_str} quality.")

filename_format = self.config.get_config_value("downloading", "track_format")
filename = filename_format.format(
artist=artist,
title=track_title,
album=album_name,
tracknumber=track_number,
year=album_release,
track_id = TrackId.from_uri(
f"spotify:track:{self.utils.get_id_type_from_url(url)[0]}"
)
try:
stream = self.session.content_feeder().load(
track_id, VorbisOnlyAudioQuality(quality), False, None
)
except RuntimeError as e:
if "Failed fetching audio key" in str(e):
print(
f"\n[download_track] ERROR: Could not get audio key for '{track_title}'.",
file=sys.stderr,
)
print(
"[download_track] This can happen due to regional restrictions, or if you're trying to download VERY_HIGH quality on a non-premium account.",
file=sys.stderr,
)
print(f"[download_track] Skipping this track. Original error: {e}\n", file=sys.stderr)
return
raise e

path_filename_with_ext = self._get_path_from_metadata(metadata)
path_filename = os.path.splitext(path_filename_with_ext)[0]

print(f"[download_track] Downloading {track_title} by {artist}")

path_filename = f"{self.download_path}/{filename}"

if os.path.exists(path_filename + ".ogg"):
print("[download_track] Track exists, skipping")

else:
directory_path = os.path.dirname(path_filename)

if directory_path and not os.path.exists(directory_path):
os.makedirs(directory_path)

with (
open(f"{path_filename}.ogg", "wb+") as track_file,
tqdm.tqdm(
unit="B",
unit_scale=True,
unit_divisor=1024,
total=stream.input_stream.size,
bar_format="{percentage:3.0f}%|{bar:16}|{n_fmt} / {total_fmt} | {rate_fmt}, ETA {remaining}",
) as progress_bar,
):
for _ in range(int(stream.input_stream.size / 5000) + 1):
progress_bar.update(
track_file.write(stream.input_stream.stream().read(50000))
)
directory_path = os.path.dirname(path_filename)

if directory_path and not os.path.exists(directory_path):
os.makedirs(directory_path)

with (
open(f"{path_filename}.ogg", "wb+") as track_file,
tqdm.tqdm(
unit="B",
unit_scale=True,
unit_divisor=1024,
total=stream.input_stream.size,
bar_format="{percentage:3.0f}%|{bar:16}|{n_fmt} / {total_fmt} | {rate_fmt}, ETA {remaining}",
) as progress_bar,
):
chunk_size = 8192 # A common chunk size for I/O
while True:
chunk = stream.input_stream.stream().read(chunk_size)
if not chunk:
break
progress_bar.update(track_file.write(chunk))

if self.set_metadata:
tags = {
"artist": artist,
"title": track_title,
"album": album_name,
"date": album_release, # .split("-")[0],
"tracknumber": track_number,
}
if self.set_metadata:
tags = {
"artist": artist,
"title": track_title,
"album": album_name,
"date": album_release, # .split("-")[0],
"tracknumber": track_number,
}

self.utils.set_metadata(tags, cover_image, path_filename)
self.utils.set_metadata(tags, cover_image, path_filename)

def download(self, link):
"""
execute the function based on the link
"""

link_type = self.utils.get_id_type_from_url(link)[1]

if link_type == "track":
self.download_track(link)
try:
metadata = self._get_track_metadata(link)
path = self._get_path_from_metadata(metadata)
if os.path.exists(path):
print("[download] Track already exists, skipping.")
else:
self.download_track(link, metadata)
except requests.exceptions.RequestException as e:
print(f"[download] Failed to get metadata for track: {e}", file=sys.stderr)

elif link_type in ("album", "playlist"):
self.download_playlist_or_album(link)
Expand Down
34 changes: 34 additions & 0 deletions pyspodl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env python3

"""
pyspodl - a spotify downloader using librespot
"""

import logging
from arguments import get_arguments

from downloader import Downloader
from config import Config
from utils import Utils

def main():
logging.basicConfig(level=logging.INFO)

arguments = get_arguments()
config = Config(arguments.config_path)
utils = Utils(config)

downloader = Downloader(config, utils)

if not arguments.link:
print("[main] No download links provided. Use -l or --link. Exiting.")
return

for link in arguments.link.split(" "):
downloader.download(link)

utils.clear_token()


if __name__ == "__main__":
main()
21 changes: 20 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def generate_new_token(self):
config["account"]["token"] = resp["access_token"]

try:
with open("config.toml", "w", encoding="utf-8") as f:
with open(self.config.config_file, "w", encoding="utf-8") as f:
toml.dump(config, f)
print("[generate_new_token] Token was updated")

Expand Down Expand Up @@ -120,6 +120,25 @@ def set_metadata(self, metadata, cover_image, filename):
pass # fuck you
# seriously fuck it, idk why it happens

def clear_token(self):
"""
Clears the access token from the config file upon exit.
"""
print("\n[clear_token] Clearing access token from config file...")
try:
config_data = self.config.read_config()
if config_data and "token" in config_data.get("account", {}):
config_data["account"]["token"] = ""
with open(self.config.config_file, "w", encoding="utf-8") as f:
toml.dump(config_data, f)
print("[clear_token] Token cleared successfully.")
else:
print("[clear_token] Token not found in config, nothing to clear.")
except (ConfigError, FileNotFoundError) as e:
print(f"[clear_token] Could not read or find config file: {e}", file=sys.stderr)
except Exception as e:
print(f"[clear_token] An unexpected error occurred while clearing token: {e}", file=sys.stderr)

def get_session(self):
"""
create a user session and return it
Expand Down