diff --git a/selenium-web-automation/README.md b/selenium-web-automation/README.md new file mode 100644 index 0000000000..c71b86c7fe --- /dev/null +++ b/selenium-web-automation/README.md @@ -0,0 +1,27 @@ +# Modern Web Automation With Python and Selenium + +This folder contains the code for the Real Python tutorial on [Modern Web Automation With Python and Selenium](https://realpython.com/modern-web-automation-with-python-and-selenium/). + +## Setup + +Create and activate a virtual environment, then install the dependencies: + +```sh +(venv) $ python -m pip install . +``` + +## Usage + +To start streaming music from BandCamp's _Discover_ section, you can execute the script: + +```sh +(venv) $ bandcamp-player +``` + +## Author + +Martin Breuss – martin@realpython.com + +## License + +This project is distributed under the MIT license. diff --git a/selenium-web-automation/bandcamp_player/pyproject.toml b/selenium-web-automation/bandcamp_player/pyproject.toml new file mode 100644 index 0000000000..5f732cea49 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "bandcamp_player" +version = "0.1.0" +description = "A web player for Bandcamp using Selenium" +authors = [ + { name = "Martin Breuss", email = "martin@realpython.com" }, + { name = "Bartosz Zaczyński", email = "bartosz@realpython.com" }, +] +dependencies = [ + "selenium", + "textual", +] +[project.scripts] +bandcamp-player = "bandcamp.__main__:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-data] +"*" = ["*.css"] diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/__init__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/__main__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/__main__.py new file mode 100644 index 0000000000..c2f2dc8d63 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/__main__.py @@ -0,0 +1,20 @@ +from bandcamp.tui.app import BandcampApp +from bandcamp.workers.messages import Message +from bandcamp.workers.storage import StorageWorker +from bandcamp.workers.web import WebWorker + + +def main() -> None: + storage_worker = StorageWorker() + storage_worker.start() + web_worker = WebWorker() + web_worker.start() + try: + BandcampApp(storage_worker, web_worker).run() + finally: + storage_worker.inbox.put(Message.GRACEFUL_STOP) + web_worker.inbox.put(Message.GRACEFUL_STOP) + + +if __name__ == "__main__": + main() diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/storage/__init__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/storage/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/storage/database.py b/selenium-web-automation/bandcamp_player/src/bandcamp/storage/database.py new file mode 100644 index 0000000000..6a7600f1e4 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/storage/database.py @@ -0,0 +1,49 @@ +import sqlite3 +from dataclasses import astuple +from pathlib import Path + +from bandcamp.storage.models import Track + +DATABASE_PATH = Path.home() / "bandcamp.db" +SQL_CREATE = """\ + CREATE TABLE IF NOT EXISTS history ( + id TEXT PRIMARY KEY, + title TEXT, + artist TEXT, + artist_url TEXT, + album TEXT, + album_url TEXT, + genre TEXT, + timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) +""" +SQL_INSERT = """\ + INSERT INTO history (id, title, artist, artist_url, album, album_url, genre) + VALUES (?, ?, ?, ?, ?, ?, ?) +""" +SQL_SELECT_ALL = "SELECT * FROM history" +SQL_SELECT_ONE = "SELECT id FROM history WHERE id=?" + + +class Database: + def __init__(self): + self.connection = sqlite3.connect(DATABASE_PATH) + self.cursor = self.connection.cursor() + self.create_table() + + def create_table(self): + self.cursor.execute(SQL_CREATE) + self.connection.commit() + + def persist(self, track: Track): + self.cursor.execute(SQL_SELECT_ONE, (track.id,)) + if not self.cursor.fetchone(): + self.cursor.execute(SQL_INSERT, (track.id, *astuple(track))) + self.connection.commit() + + def find_all(self): + self.cursor.execute(SQL_SELECT_ALL) + return [Track(*row[1:-1]) for row in self.cursor.fetchall()] + + def close(self): + self.connection.close() diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/storage/models.py b/selenium-web-automation/bandcamp_player/src/bandcamp/storage/models.py new file mode 100644 index 0000000000..da80e1bd87 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/storage/models.py @@ -0,0 +1,17 @@ +import hashlib +from dataclasses import astuple, dataclass + + +@dataclass(frozen=True) +class Track: + title: str + artist: str + artist_url: str + album: str | None = None + album_url: str | None = None + genre: str | None = None + + @property + def id(self) -> str: + data = "".join([str(x) for x in astuple(self)]).encode("utf-8") + return hashlib.md5(data).hexdigest() diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/tui/__init__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.css b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.css new file mode 100644 index 0000000000..5624fa9358 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.css @@ -0,0 +1,33 @@ +.horizontal { + layout: horizontal; + margin: 1; +} + +.hidden { + display: none; +} + +Playlist { + height: 80%; +} + +DataTable { + margin-right: 1; + height: 100%; +} + +#current_track { + content-align: center middle; + height: 3; + margin-left: 2; +} + +#pager-buttons { + width: 20; +} + +#pager-buttons Button { + width: 20; + height: 3; + margin-bottom: 1; +} diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.py b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.py new file mode 100644 index 0000000000..a658569117 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/app.py @@ -0,0 +1,25 @@ +from textual.app import App +from textual.widgets import Footer, Header + +from bandcamp.tui.state import AppStateMixin +from bandcamp.tui.widgets import Player, Playlist + + +class BandcampApp(AppStateMixin, App): + CSS_PATH = "app.css" + TITLE = "Bandcamp Player" + BINDINGS = [ + ("q", "quit", "Quit"), + ("space", "toggle_play", "Play/Pause"), + ] + + def __init__(self, storage_worker, web_worker): + super().__init__() + self.storage_worker = storage_worker + self.web_worker = web_worker + + def compose(self): + yield Header() + yield Footer() + yield Player(classes="horizontal") + yield Playlist(classes="horizontal") diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/tui/state.py b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/state.py new file mode 100644 index 0000000000..a639f9d2dc --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/state.py @@ -0,0 +1,70 @@ +from rich.text import Text +from textual.widgets import DataTable, Label + +from bandcamp.workers.messages import Message + + +class AppStateMixin: + def action_toggle_play(self): + if self.query_one("#play").has_class("hidden"): + self.query_one("#pause").add_class("hidden") + self.query_one("#play").remove_class("hidden") + self.call_after_refresh(self.pause) + else: + self.query_one("#play").add_class("hidden") + self.query_one("#pause").remove_class("hidden") + self.call_after_refresh(self.play) + self.call_after_refresh(self.persist_current_track) + + def play(self): + self.web_worker.inbox.put(Message.PLAY) + + def pause(self): + self.web_worker.inbox.put(Message.PAUSE) + + def persist_current_track(self): + track = self.web_worker.request(Message.CURRENT_TRACK) + self.storage_worker.inbox.put(track) + + +class PlayerStateMixin: + def on_mount(self): + label = self.query_one("#current_track", Label) + track = self.app.web_worker.request(Message.FIRST_TRACK) + label.update(track.title) + + +class PlaylistStateMixin: + def on_mount(self): + self.update_table() + + def move_next(self): + self.app.web_worker.request(Message.NEXT_PAGE) + self.update_table() + + def move_previous(self): + self.app.web_worker.request(Message.PREVIOUS_PAGE) + self.update_table() + + def play_row(self, index: int): + title = self.app.web_worker.request((Message.PLAY_TRACK, index)) + self.app.query_one("#current_track", Label).update(title) + self.app.query_one("#play").add_class("hidden") + self.app.query_one("#pause").remove_class("hidden") + self.call_after_refresh(self.app.persist_current_track) + + def update_table(self): + page_number, visible_tracks = self.app.web_worker.request(Message.PAGE) + offset = 8 * (page_number - 1) + rows = [ + [ + Text(f"{offset + i}.", justify="right"), + track.title, + track.artist, + track.genre, + ] + for i, track in enumerate(visible_tracks, 1) + ] + table = self.query_one("#table", DataTable) + table.clear() + table.add_rows(rows) diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/tui/widgets.py b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/widgets.py new file mode 100644 index 0000000000..c180d0ce47 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/tui/widgets.py @@ -0,0 +1,44 @@ +from textual import on +from textual.widgets import Button, DataTable, Label, Static + +from bandcamp.tui.state import PlayerStateMixin, PlaylistStateMixin + + +class Player(PlayerStateMixin, Static): + def compose(self): + yield Button("Play", variant="success", id="play") + yield Button("Pause", variant="error", id="pause", classes="hidden") + yield Label("N/A", id="current_track") + + @on(Button.Pressed, "#play") + def on_play_click(self): + self.call_after_refresh(self.app.action_toggle_play) + + @on(Button.Pressed, "#pause") + def on_pause_click(self): + self.call_after_refresh(self.app.action_toggle_play) + + +class Playlist(PlaylistStateMixin, Static): + def compose(self): + table = DataTable(id="table") + table.cursor_type = "row" + table.add_columns("Track", "Title", "Artist", "Genre") + table.move_cursor(row=0) + table.focus() + yield table + with Static(id="pager-buttons"): + yield Button("Next Page ›", id="next") + yield Button("‹ Previous Page", id="previous") + + @on(Button.Pressed, "#next") + def on_next_click(self): + self.call_after_refresh(self.move_next) + + @on(Button.Pressed, "#previous") + def on_previous_click(self): + self.call_after_refresh(self.move_previous) + + @on(DataTable.RowSelected, "#table") + def on_row_selected(self, event): + self.call_after_refresh(self.play_row, event.cursor_row) diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/web/__init__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/web/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/web/base.py b/selenium-web-automation/bandcamp_player/src/bandcamp/web/base.py new file mode 100644 index 0000000000..6704b8c231 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/web/base.py @@ -0,0 +1,85 @@ +""" +Base classes to implement the Page Object pattern. This module also includes +a custom descriptor that mimics Java's @FindBy annotation, leveraging type +hints to infer the elements to search for. +""" + +import typing +from dataclasses import dataclass +from typing import Self + +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support.wait import WebDriverWait + +MAX_WAIT_SECONDS = 10.0 + + +class WebPage: + def __init__(self, driver: WebDriver) -> None: + self._driver = driver + self._wait = WebDriverWait(driver, MAX_WAIT_SECONDS) + + +class WebComponent: + def __init__(self, parent: WebElement, driver: WebDriver = None) -> None: + self._parent = parent + if driver: + self._driver = driver + self._wait = WebDriverWait(driver, MAX_WAIT_SECONDS) + + +@dataclass +class SearchMode: + many: bool + cast: type | None = None + + @classmethod + def of(cls, owner: type, name: str) -> Self: + if type_hint := owner.__annotations__.get(name): + if origin := typing.get_origin(type_hint): + arg, *_ = typing.get_args(type_hint) + if issubclass(origin, typing.Sequence): + return cls(many=True, cast=arg) + else: + raise TypeError("Can only use generic sequences") + else: + if issubclass(type_hint, typing.Sequence): + return cls(many=True) + else: + return cls(many=False, cast=type_hint) + else: + return cls(many=False) + + +class FindBy: + def __init__(self, by: str, value: str, cache: bool = True) -> None: + self._by = by + self._value = value + self._cache = cache + + def __set_name__(self, owner: type, name: str) -> None: + self._name = name + + def __get__( + self, instance: WebPage | WebComponent, owner: type + ) -> WebElement | list[WebElement] | WebComponent | list[WebComponent]: + match instance: + case WebPage(): + root = instance._driver # Search within the entire HTML body + case WebComponent(): + root = instance._parent # Narrow down the search to a DOM node + case _: + raise TypeError("FindBy must be in WebPage or WebComponent") + mode = SearchMode.of(owner, self._name) + if mode.many: + value = root.find_elements(self._by, self._value) + if mode.cast: + value = [mode.cast(x, instance._driver) for x in value] + else: + value = root.find_element(self._by, self._value) + if mode.cast: + value = mode.cast(value, instance._driver) + if self._cache: + setattr(owner, self._name, value) + return value diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/web/components.py b/selenium-web-automation/bandcamp_player/src/bandcamp/web/components.py new file mode 100644 index 0000000000..bdebce5c0f --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/web/components.py @@ -0,0 +1,114 @@ +""" +Abstractions over page fragments that appear on the Bandcamp home page. +""" + +import time + +from selenium.webdriver.common.by import By + +from bandcamp.storage.models import Track +from bandcamp.web.base import FindBy, WebComponent + + +class Player(WebComponent): + LOCATOR = (By.CSS_SELECTOR, ".discover-detail-inner") + + _play_button = FindBy(By.CSS_SELECTOR, ".playbutton") + _title = FindBy(By.CSS_SELECTOR, ".title") + _album = FindBy(By.CSS_SELECTOR, ".detail-album a") + _artist = FindBy(By.CSS_SELECTOR, ".detail-artist a") + + @property + def current_track(self) -> Track: + return Track( + title=self._title.text, + album=self._album.text, + album_url=self._album.get_attribute("href"), + artist=self._artist.text, + artist_url=self._artist.get_attribute("href"), + ) + + def play(self) -> None: + if not self.is_playing(): + self._play_button.click() + self._wait.until(lambda _: self.is_playing()) + + def pause(self) -> None: + if self.is_playing(): + self._play_button.click() + self._wait.until(lambda _: not self.is_playing()) + + def is_playing(self) -> bool: + return "playing" in self._play_button.get_attribute("class") + + +class PagerButton(WebComponent): + def click(self) -> None: + if self.is_clickable(): + self._parent.click() + + def is_clickable(self) -> bool: + return "disabled" not in self._parent.get_attribute("class") + + +class TrackItem(WebComponent): + _title = FindBy(By.CSS_SELECTOR, ".item-title", cache=False) + _artist = FindBy(By.CSS_SELECTOR, ".item-artist", cache=False) + _genre = FindBy(By.CSS_SELECTOR, ".item-genre", cache=False) + + @property + def track(self) -> Track: + return Track( + title=self._title.text, + artist=self._artist.text, + artist_url=self._artist.get_attribute("href"), + genre=self._genre.text, + ) + + def play(self) -> None: + self._parent.click() + self._wait.until(lambda _: not self.is_loading()) + + def is_loading(self): + play_button = self._driver.find_element( + By.CSS_SELECTOR, ".detail-player .playbutton" + ) + return "busy" in play_button.get_attribute("class") + + +class Discover(WebComponent): + LOCATOR = (By.CSS_SELECTOR, ".discover-results") + CSS_TRANSITION_SECONDS = 0.5 + + track_items: list[TrackItem] = FindBy( + By.CSS_SELECTOR, + ".discover-result.result-current .discover-item", + cache=False, + ) + _page_number = FindBy(By.CSS_SELECTOR, ".item-page.selected", cache=False) + _previous: PagerButton = FindBy( + By.XPATH, + "//a[contains(@class, 'item-page')][text()='previous']", + cache=False, + ) + _next: PagerButton = FindBy( + By.XPATH, + "//a[contains(@class, 'item-page')][text()='next']", + cache=False, + ) + + @property + def visible_tracks(self) -> list[Track]: + return [x.track for x in self.track_items] + + @property + def page_number(self) -> int: + return int(self._page_number.text) + + def click_previous(self) -> None: + self._previous.click() + time.sleep(Discover.CSS_TRANSITION_SECONDS) + + def click_next(self) -> None: + self._next.click() + time.sleep(Discover.CSS_TRANSITION_SECONDS) diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/web/pages.py b/selenium-web-automation/bandcamp_player/src/bandcamp/web/pages.py new file mode 100644 index 0000000000..dd8d57ed82 --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/web/pages.py @@ -0,0 +1,13 @@ +""" +Page objects modeling the individual views. +""" + +from bandcamp.web.base import FindBy, WebPage +from bandcamp.web.components import Discover, Player + +BANDCAMP_URL = "https://bandcamp.com/" + + +class BandcampHome(WebPage): + player: Player = FindBy(*Player.LOCATOR) + discover: Discover = FindBy(*Discover.LOCATOR) diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/workers/__init__.py b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/workers/messages.py b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/messages.py new file mode 100644 index 0000000000..1b0c9e229c --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/messages.py @@ -0,0 +1,14 @@ +from enum import Enum, auto + + +class Message(Enum): + ACKNOWLEDGE = auto() + CURRENT_TRACK = auto() + FIRST_TRACK = auto() + GRACEFUL_STOP = auto() + NEXT_PAGE = auto() + PAGE = auto() + PAUSE = auto() + PLAY = auto() + PLAY_TRACK = auto() + PREVIOUS_PAGE = auto() diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/workers/storage.py b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/storage.py new file mode 100644 index 0000000000..7381aa678d --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/storage.py @@ -0,0 +1,25 @@ +import queue +import threading + +from bandcamp.storage.database import Database +from bandcamp.workers.messages import Message + + +class StorageWorker(threading.Thread): + def __init__(self): + super().__init__() + self.inbox = queue.Queue() + + def run(self): + database = Database() + try: + while True: + match self.inbox.get(): + case Message.GRACEFUL_STOP: + break + case track: + database.persist(track) + self.inbox.task_done() + finally: + if database: + database.close() diff --git a/selenium-web-automation/bandcamp_player/src/bandcamp/workers/web.py b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/web.py new file mode 100644 index 0000000000..108d9eab3b --- /dev/null +++ b/selenium-web-automation/bandcamp_player/src/bandcamp/workers/web.py @@ -0,0 +1,72 @@ +import queue +import threading + +from selenium import webdriver +from selenium.webdriver.remote.webdriver import WebDriver + +from bandcamp.web.pages import BANDCAMP_URL, BandcampHome +from bandcamp.workers.messages import Message + + +class WebWorker(threading.Thread): + def __init__(self): + super().__init__() + self.inbox = queue.Queue() + self.outbox = queue.Queue() + + def run(self): + driver = open_headless_browser() + home_page = navigate_home(driver) + try: + while True: + try: + match self.inbox.get(): + case Message.GRACEFUL_STOP: + break + case [Message.PLAY_TRACK, index]: + track = home_page.discover.visible_tracks[index] + self.outbox.put(track.title) + home_page.discover.track_items[index].play() + case Message.PLAY: + home_page.player.play() + case Message.PAUSE: + home_page.player.pause() + case Message.CURRENT_TRACK: + track = home_page.player.current_track + self.outbox.put(track) + case Message.FIRST_TRACK: + track = home_page.discover.visible_tracks[0] + self.outbox.put(track) + case Message.PAGE: + page_number = home_page.discover.page_number + visible_tracks = home_page.discover.visible_tracks + self.outbox.put((page_number, visible_tracks)) + case Message.NEXT_PAGE: + home_page.discover.click_next() + self.outbox.put(Message.ACKNOWLEDGE) + case Message.PREVIOUS_PAGE: + home_page.discover.click_previous() + self.outbox.put(Message.ACKNOWLEDGE) + finally: + self.inbox.task_done() + finally: + if driver: + driver.close() + + def request(self, message: Message): + self.inbox.put(message) + try: + return self.outbox.get() + finally: + self.outbox.task_done() + + +def open_headless_browser() -> WebDriver: + options = webdriver.ChromeOptions() + options.add_argument("--headless=new") + return webdriver.Chrome(options=options) + + +def navigate_home(driver) -> BandcampHome: + driver.get(BANDCAMP_URL) + return BandcampHome(driver) diff --git a/selenium-web-automation/old/README.md b/selenium-web-automation/old/README.md new file mode 100644 index 0000000000..4e876921e2 --- /dev/null +++ b/selenium-web-automation/old/README.md @@ -0,0 +1,39 @@ +# Modern Web Automation With Python and Selenium + +This folder contains the code for the Real Python tutorial on [Modern Web Automation With Python and Selenium](https://realpython.com/modern-web-automation-with-python-and-selenium/). + +## Setup + +Create and activate a virtual environment, then install the dependencies: + +```sh +(venv) $ python -m pip install -r requirements.txt +``` + +## Usage + +To start streaming music from BandCamp's _Discover_ section, you can execute the script: + +```sh +(venv) $ python player.py +``` + +This will instantiate a `BandLeader` object and call it's `.stream()` method. Music should start playing and you'll see information about the tracks you'll listen to printed to your console. + +If you want to interact with the class in a different way, then you can start a REPL session and import `BandLeader`, then go from there: + +```python +>>> from player import BandLeader +>>> b = BandLeader() +>>> dir(b) +``` + +The class will record a history of the songs you listen to in a CSV file. + +## Author + +Martin Breuss – martin@realpython.com + +## License + +This project is distributed under the MIT license. diff --git a/selenium-web-automation/old/player.py b/selenium-web-automation/old/player.py new file mode 100644 index 0000000000..4e81117ffe --- /dev/null +++ b/selenium-web-automation/old/player.py @@ -0,0 +1,364 @@ +import atexit +import csv +import logging +import threading +from dataclasses import asdict, dataclass, fields +from pathlib import Path +from time import ctime, sleep +from typing import List, Optional + +from selenium.common.exceptions import NoSuchElementException +from selenium.webdriver import Firefox +from selenium.webdriver.common.by import By +from selenium.webdriver.firefox.options import Options +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait + +logging.basicConfig(level=logging.INFO) + +BANDCAMP_FRONTPAGE = "https://bandcamp.com/" +DEFAULT_DB_PATH = "bandcamp_history.csv" +DB_REGISTRATION_DELAY = 0.5 +DB_CHECK_DELAY = 20 + + +@dataclass +class TrackRec: + title: str + artist: str + artist_url: str + album: str + album_url: str + timestamp: str + + +class BandLeader: + """A class to control playback of Bandcamp music using Selenium.""" + + def __init__(self, csv_path: str = DEFAULT_DB_PATH): + """Initialize a BandLeader object. + + Args: + csv_path (str): The path to the CSV file + to load/save track records. + """ + self.browser = self._set_up_bandcamp_browser() + self._lock = threading.RLock() + atexit.register(self._close_headless_browser) + + self.database_path = Path(csv_path) + self.temp_database: List[TrackRec] = self._load_db_state() + self._current_track_record: Optional[TrackRec] = None + + self.db_update_thread = threading.Thread( + target=self._maintain, daemon=True + ) + self.db_update_thread.start() + + self._current_track_number = 0 + self.playable_tracks = self._fetch_tracks() + self.skip_song = False + self.stop_stream = False + + self.show_tracks() + + def __enter__(self): + """Enter the runtime context related to this object.""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context.""" + self._close_headless_browser() + + # Public Methods - Playback Control + def play(self, track: Optional[int] = None): + """Play a track. + + If no track number is supplied, the presently selected track will play. + If no track has been selected yet, the first track will play. + + Args: + track (Optional[int]): The track number to play. + """ + print("\nNow playing:") + with self._lock: + if track is None: + play_button = self.browser.find_element( + By.CLASS_NAME, "playbutton" + ) + play_button.click() + self._current_track_number = 1 + self._show_track_info(1) + elif 1 <= track <= len(self.playable_tracks): + self._current_track_number = track + self.playable_tracks[self._current_track_number - 1].click() + self._show_track_info(track) + else: + logging.error(f"Invalid track number: {track}") + return + + try: + WebDriverWait(self.browser, 10).until( + lambda driver: self._is_playing() + ) + if self._is_playing(): + self._current_track_record = self.currently_playing() + except Exception as e: + logging.error(f"Error starting playback: {e}") + + def pause(self): + """Pause and unpause the playback.""" + with self._lock: + try: + play_button = self.browser.find_element( + By.CLASS_NAME, "playbutton" + ) + play_button.click() + except NoSuchElementException: + logging.error("Play button not found, can't pause playback") + + def currently_playing(self) -> Optional[TrackRec]: + """Get the record for the currently playing track. + + Returns: + Optional[TrackRec]: The currently playing track information, + or None if nothing is playing. + """ + try: + with self._lock: + if self._is_playing(): + player_element = self.browser.find_element( + By.CLASS_NAME, "discover-detail" + ) + title = player_element.find_element( + By.CLASS_NAME, "title" + ).text + album_element = player_element.find_element( + By.CLASS_NAME, "detail-album" + ) + album_title = album_element.text + album_url = ( + album_element.find_element(By.TAG_NAME, "a") + .get_attribute("href") + .split("?")[0] + ) + artist_element = player_element.find_element( + By.CLASS_NAME, "detail-artist" + ).find_element(By.TAG_NAME, "a") + artist = artist_element.text + artist_url = artist_element.get_attribute("href").split( + "?" + )[0] + + return TrackRec( + title=title, + artist=artist, + artist_url=artist_url, + album=album_title, + album_url=album_url, + timestamp=ctime(), + ) + except NoSuchElementException as e: + logging.error(f"Element not found in currently_playing(): {e}") + except Exception as e: + logging.error(f"There was an error in currently_playing(): {e}") + + return None + + def stream(self): + """Begin streaming music, listening for user commands.""" + input_thread = threading.Thread(target=self._listen_for_input) + input_thread.start() + + while not self.stop_stream: + if self.skip_song: + self._play_next() + self.skip_song = False + + if not self._is_playing(): + self.play() + + sleep(0.1) + + input_thread.join() + + def show_tracks(self): + """Print the available tracks to the screen.""" + for number, _ in enumerate(self.playable_tracks, start=1): + self._show_track_info(number) + + def get_new_tracks(self): + """Advance the catalogue and repopulate the track list.""" + if self._is_playing(): + self.pause() + + with self._lock: + page_buttons = self.browser.find_elements( + By.CLASS_NAME, "item-page" + ) + next_btn = page_buttons[-1] if page_buttons else None + + if next_btn: + next_btn.click() + self.playable_tracks = self._fetch_tracks() + self.show_tracks() + self._current_track_number = 1 + else: + logging.error("Next button not found, can't get new tracks") + + # Private Methods - Browser Management + def _set_up_bandcamp_browser(self) -> Firefox: + """Create a headless browser pointing to BandCamp.""" + options = Options() + options.add_argument("--headless") + browser = Firefox(options=options) + browser.get(BANDCAMP_FRONTPAGE) + WebDriverWait(browser, 10).until(EC.title_contains("Bandcamp")) + logging.info("Bandcamp page loaded successfully.") + return browser + + def _close_headless_browser(self): + """Close the headless browser.""" + if self.browser: + with self._lock: + logging.info("Closing headless browser...") + self.browser.quit() + self.browser = None + + # Private Methods - Database Maintenance + def _load_db_state(self) -> List[TrackRec]: + """Load database from disk, or create new empty file.""" + if self.database_path and self.database_path.is_file(): + with self.database_path.open(newline="") as dbfile: + dbreader = csv.DictReader(dbfile) + return [TrackRec(**rec) for rec in dbreader] + else: + logging.info("No existing database found. Starting fresh.") + Path.cwd().joinpath(self.database_path).touch() + return [] + + def _maintain(self): + """Background thread that updates the database at intervals.""" + while True: + self._update_db() + sleep(DB_CHECK_DELAY) + + def _update_db(self): + """Check the currently playing track and update the database.""" + try: + needs_update = ( + self._current_track_record is not None + and ( + not self.temp_database + or self.temp_database[-1] != self._current_track_record + ) + and self._is_playing() + ) + if needs_update: + self.temp_database.append(self._current_track_record) + self._save_db() + logging.info( + f"Added track to database: {self._current_track_record}" + ) + except Exception as e: + logging.error(f"Error while updating the db: {e}") + + def _save_db(self): + """Save the current database to CSV file.""" + if self.database_path: + try: + with self.database_path.open(mode="w", newline="") as dbfile: + fieldnames = [field.name for field in fields(TrackRec)] + dbwriter = csv.DictWriter(dbfile, fieldnames=fieldnames) + dbwriter.writeheader() + for record in self.temp_database: + dbwriter.writerow(asdict(record)) + logging.info(f"Database saved to {self.database_path}") + except Exception as e: + logging.error(f"Error while saving the db: {e}") + + # Private Methods - Input Handling + def _listen_for_input(self): + """Listen for user input to control the stream.""" + while not self.stop_stream: + try: + user_input = input().lower() + if user_input == "q": + self.stop_stream = True + print("Exiting stream...") + self.pause() + elif user_input == "n": + self.skip_song = True + print("Skipping to next song...") + except EOFError: + break + + # Private Methods - Playback Helpers + def _is_playing(self) -> bool: + """Check if a track is currently playing. + + Returns: + bool: True if a track is playing, False otherwise. + """ + with self._lock: + try: + playbtn = self.browser.find_element( + By.CLASS_NAME, "playbutton" + ) + return "playing" in playbtn.get_attribute("class") + except NoSuchElementException: + logging.error("Play button not found in ._is_playing()") + return False + + def _play_next(self): + """Play the next available track.""" + self._current_track_number += 1 + if self._current_track_number <= len(self.playable_tracks): + self.play(self._current_track_number) + else: + self.get_new_tracks() + self.play(1) + + # Private Methods - Track Management + def _fetch_tracks(self): + """Query the page to populate a list of available tracks.""" + with self._lock: + try: + WebDriverWait(self.browser, 10).until( + EC.presence_of_all_elements_located( + (By.CLASS_NAME, "discover-item") + ) + ) + all_tracks = self.browser.find_elements( + By.CLASS_NAME, "discover-item" + ) + return [track for track in all_tracks if track.is_displayed()] + except Exception as e: + logging.error(f"Error fetching tracks: {e}") + return [] + + def _show_track_info(self, track_number: int): + """Show information about a specific track. + + Args: + track_number (int): The track number to display information about. + """ + try: + track = self.playable_tracks[track_number - 1] + album, artist, *genre = track.text.split("\n") + info = ( + f"[{track_number}]\n" + f"Album : {album}\n" + f"Artist : {artist}\n" + f"Genre : {genre[0]}\n" + if genre + else "\n" + ) + print(info) + except IndexError: + logging.error(f"Track number {track_number} is out of range") + + +if __name__ == "__main__": + with BandLeader(csv_path="band_db.csv") as band_leader: + band_leader.stream() diff --git a/selenium-web-automation/old/requirements.txt b/selenium-web-automation/old/requirements.txt new file mode 100644 index 0000000000..30380d2daa --- /dev/null +++ b/selenium-web-automation/old/requirements.txt @@ -0,0 +1,15 @@ +attrs==24.2.0 +certifi==2024.7.4 +h11==0.14.0 +idna==3.7 +outcome==1.3.0.post0 +PySocks==1.7.1 +selenium==4.23.1 +sniffio==1.3.1 +sortedcontainers==2.4.0 +trio==0.26.1 +trio-websocket==0.11.1 +typing_extensions==4.12.2 +urllib3==2.2.2 +websocket-client==1.8.0 +wsproto==1.2.0