From 3f7e9deae9569cd106b98fbca5960ac6a9f32217 Mon Sep 17 00:00:00 2001 From: Blondel MONDESIR Date: Tue, 26 Mar 2024 22:41:46 -0400 Subject: [PATCH] Update to retrieve key metadata This builds upon #142 to progress towards implementing the top 100 feature --- cps/tasks/metadata_extract.py | 233 +++++++++++++++++----------------- 1 file changed, 119 insertions(+), 114 deletions(-) diff --git a/cps/tasks/metadata_extract.py b/cps/tasks/metadata_extract.py index 9abe094257..3ad1beb5a1 100644 --- a/cps/tasks/metadata_extract.py +++ b/cps/tasks/metadata_extract.py @@ -18,10 +18,10 @@ class TaskMetadataExtract(CalibreTask): def __init__(self, task_message, media_url, original_url, current_user_name): super(TaskMetadataExtract, self).__init__(task_message) self.message = task_message - self.media_url = media_url - self.media_url_link = f'{media_url}' - # (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax - self.original_url = re.sub(r"/media(?=\?|$)", r"/meta", original_url) + self.media_url = self._format_media_url(media_url) + self.media_url_link = f'{self.media_url}' + self.original_url = self._format_original_url(original_url) + self.type_of_url = self._get_type_of_url(self.media_url) self.current_user_name = current_user_name self.start_time = self.end_time = datetime.now() self.stat = STAT_WAITING @@ -29,11 +29,105 @@ def __init__(self, task_message, media_url, original_url, current_user_name): self.columns = None self.shelf_title = None self.shelf_id = None - self.playlist_id = None - self.main_message = None + + def _format_media_url(self, media_url): + return media_url.split("&")[0] if "&" in media_url else media_url + + def _format_original_url(self, original_url): + # (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax + return re.sub(r"/media(?=\?|$)", r"/meta", original_url) + + def _get_type_of_url(self, media_url): + if "list=" in media_url: + return "playlist" + elif "@" in media_url: + return "channel" + else: + return "video" + + def _execute_subprocess(self, subprocess_args): + try: + p = process_open(subprocess_args, newlines=True) + p.wait() + self.message = self.media_url_link + "..." + return p + except Exception as e: + log.error("An error occurred during subprocess execution: %s", e) + self.message = f"{self.media_url_link} failed: {e}" + return None + + def _fetch_requested_urls(self, conn): + try: + cursor = conn.execute("PRAGMA table_info(media)") + self.columns = [column[1] for column in cursor.fetchall()] + query = ("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'" + if "error" in self.columns + else "SELECT path, duration FROM media WHERE path LIKE 'http%'") + rows = conn.execute(query).fetchall() + return {row[0]: {"duration": row[1], "is_playlist_video": self._is_playlist_video(row[0], conn)} for row in rows} + except sqlite3.Error as db_error: + log.error("An error occurred while trying to connect to the database: %s", db_error) + self.message = f"{self.media_url_link} failed: {db_error}" + return {} + + def _is_playlist_video(self, path, conn): + try: + return bool(conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone()) + except sqlite3.Error as db_error: + log.error("An error occurred while trying to connect to the database: %s", db_error) + return False + + def _get_shelf_title(self, conn): + url_part = self.media_url.split("/")[-1] + if "list=" in url_part: + url_part = url_part.split("list=")[-1] + try: + self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (url_part,)).fetchone()[0] + except sqlite3.Error as db_error: + log.error("An error occurred while trying to connect to the database: %s", db_error) + elif "@" in url_part: + self.shelf_title = url_part.split("@")[-1] + else: + self.shelf_title = "Unnamed Bookshelf" + + def _send_shelf_title(self): + try: + response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title}) + if response.status_code == 200: + self.shelf_id = response.json()["shelf_id"] + else: + log.error("Received unexpected status code %s while sending the shelf title to %s", response.status_code, self.original_url) + except Exception as e: + log.error("An error occurred during the shelf title sending: %s", e) + + def _update_metadata(self, requested_urls): + failed_urls = [] + subprocess_args_list = [[os.getenv("LB_WRAPPER", "lb-wrapper"), "tubeadd", requested_url] for requested_url in requested_urls.keys()] + + for index, subprocess_args in enumerate(subprocess_args_list): + try: + p = self._execute_subprocess(subprocess_args) + if p is not None: + self.progress = (index + 1) / len(subprocess_args_list) + else: + failed_urls.append(subprocess_args[2]) + p.wait() + except Exception as e: + log.error("An error occurred during updating the metadata of %s: %s", subprocess_args[2], e) + self.message = f"{subprocess_args[2]} failed: {e}" + failed_urls.append(subprocess_args[2]) + + def _add_download_tasks_to_worker(self, requested_urls): + for index, requested_url in enumerate(requested_urls.keys()): + task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url), + requested_url, self.original_url, + self.current_user_name, self.shelf_id) + WorkerThread.add(self.current_user_name, task_download) + num_requested_urls = len(requested_urls) + total_duration = sum(url_data["duration"] for url_data in requested_urls.values()) + self.message = self.media_url_link + f"

Number of Videos: {index + 1}/{num_requested_urls}
Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}" def run(self, worker_thread): - """Run the metadata fetching task""" self.worker_thread = worker_thread log.info("Starting to fetch metadata for URL: %s", self.media_url) self.start_time = self.end_time = datetime.now() @@ -41,117 +135,28 @@ def run(self, worker_thread): self.progress = 0 lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper") + subprocess_args = [lb_executable, "tubeadd", self.media_url] - if self.media_url: - if "&" in self.media_url: - self.media_url = self.media_url.split("&")[0] - subprocess_args = [lb_executable, "tubeadd", self.media_url] - log.info("Subprocess args: %s", subprocess_args) + p = self._execute_subprocess(subprocess_args) + if p is None: + self.stat = STAT_FAIL + return - # Execute the download process using process_open - try: - p = process_open(subprocess_args, newlines=True) - - p.wait() - self_main_message = f"{self.media_url_link}" - self.message = self_main_message - - # Database operations - requested_urls = {} - with sqlite3.connect(XKLB_DB_FILE) as conn: - try: - cursor = conn.execute("PRAGMA table_info(media)") - self.columns = [column[1] for column in cursor.fetchall()] - if "error" in self.columns: - rows = conn.execute("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'").fetchall() - else: - rows = conn.execute("SELECT path, duration FROM media WHERE path LIKE 'http%'").fetchall() - - # Abort if there are no urls - if not rows: - log.info("No urls found in the database") - error = conn.execute("SELECT error, webpath FROM media WHERE error IS NOT NULL AND webpath = ?", (self.media_url,)).fetchone() - if error: - log.error("[xklb] An error occurred while trying to retrieve the data for %s: %s", error[1], error[0]) - self.progress = 0 - self.message = f"{error[1]} gave no data : {error[0]}" - return - - for row in rows: - path = row[0] - duration = row[1] - is_playlist_video = False - if "playlists_id" in self.columns: - playlist_id = conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone() - if playlist_id: - is_playlist_video = True - requested_urls[path] = { - "duration": duration, - "is_playlist_video": is_playlist_video - } - - except sqlite3.Error as db_error: - log.error("An error occurred while trying to connect to the database: %s", db_error) - self.message = f"{self.media_url_link} failed: {db_error}" - - # get the shelf title - if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]): - try: - self.playlist_id = self.media_url.split("/")[-1] - if "list=" in self.playlist_id: - self.playlist_id = self.playlist_id.split("list=")[-1] - self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (self.playlist_id,)).fetchone()[0] - elif "@" in self.playlist_id: - self.shelf_title = self.playlist_id.split("@")[-1] - else: - self.shelf_title = "Unnamed Bookshelf" - except sqlite3.Error as db_error: - if "no such table: playlists" in str(db_error): - log.info("No playlists table found in the database") - self.playlist_id = None - else: - log.error("An error occurred while trying to connect to the database: %s", db_error) - self.message = f"{self.media_url_link} failed to download: {db_error}" - self.progress = 0 - finally: - log.info("Shelf title: %s", self.shelf_title) - - conn.close() - - if self.shelf_title: - response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title}) - if response.status_code == 200: - self.shelf_id = response.json()["shelf_id"] - else: - log.error("An error occurred while trying to send the shelf title to %s", self.original_url) - - num_requested_urls = len(requested_urls.keys()) - total_duration = 0 - - for index, requested_url in enumerate(requested_urls.keys()): - task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url), - requested_url, self.original_url, - self.current_user_name, self.shelf_id - ) - WorkerThread.add(self.current_user_name, task_download) - - self.progress = (index + 1) / num_requested_urls - if requested_urls[requested_url]["duration"] is not None: - total_duration += requested_urls[requested_url]["duration"] - self.message = self_main_message + f"

Number of Videos: {index + 1}/{num_requested_urls}
Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}" + with sqlite3.connect(XKLB_DB_FILE) as conn: + requested_urls = self._fetch_requested_urls(conn) + if not requested_urls: + return - except Exception as e: - log.error("An error occurred during the subprocess execution: %s", e) - self.message = f"{self.media_url_link} failed: {e}" + if self.type_of_url != "video": + self._get_shelf_title(conn) + if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]): + self._send_shelf_title() + self._update_metadata(requested_urls) - finally: - if p.returncode == 0 or self.progress == 1.0: - self.stat = STAT_FINISH_SUCCESS - else: - self.stat = STAT_FAIL + self._add_download_tasks_to_worker(requested_urls) + conn.close() - else: - log.info("No media URL provided - skipping download task") + self.stat = STAT_FINISH_SUCCESS @property def name(self): @@ -162,4 +167,4 @@ def __str__(self): @property def is_cancellable(self): - return True # Change to True if the download task should be cancellable + return True