diff --git a/cps/tasks/metadata_extract.py b/cps/tasks/metadata_extract.py
index 9abe09425..49cf10568 100644
--- a/cps/tasks/metadata_extract.py
+++ b/cps/tasks/metadata_extract.py
@@ -18,10 +18,10 @@ class TaskMetadataExtract(CalibreTask):
def __init__(self, task_message, media_url, original_url, current_user_name):
super(TaskMetadataExtract, self).__init__(task_message)
self.message = task_message
- self.media_url = media_url
- self.media_url_link = f'{media_url}'
- # (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax
- self.original_url = re.sub(r"/media(?=\?|$)", r"/meta", original_url)
+ self.media_url = self._format_media_url(media_url)
+ self.media_url_link = f'{self.media_url}'
+ self.original_url = self._format_original_url(original_url)
+ self.type_of_url = self._get_type_of_url(self.media_url)
self.current_user_name = current_user_name
self.start_time = self.end_time = datetime.now()
self.stat = STAT_WAITING
@@ -29,11 +29,88 @@ def __init__(self, task_message, media_url, original_url, current_user_name):
self.columns = None
self.shelf_title = None
self.shelf_id = None
- self.playlist_id = None
- self.main_message = None
+ def _format_media_url(self, media_url):
+ return media_url.split("&")[0] if "&" in media_url else media_url
+ def _format_original_url(self, original_url):
+ # (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax
+ return re.sub(r"/media(?=\?|$)", r"/meta", original_url)
+ def _get_type_of_url(self, media_url):
+ if "list=" in media_url:
+ return "playlist"
+ elif "@" in media_url:
+ return "channel"
+ else:
+ return "video"
+ def _execute_subprocess(self, subprocess_args):
+ try:
+ p = process_open(subprocess_args, newlines=True)
+ p.wait()
+ self.message = self.media_url_link + "..."
+ return p
+ except Exception as e:
+ log.error("An error occurred during subprocess execution: %s", e)
+ self.message = f"{self.media_url_link} failed: {e}"
+ return None
+ def _fetch_requested_urls(self, conn):
+ try:
+ cursor = conn.execute("PRAGMA table_info(media)")
+ self.columns = [column[1] for column in cursor.fetchall()]
+ query = ("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'"
+ if "error" in self.columns
+ else "SELECT path, duration FROM media WHERE path LIKE 'http%'")
+ rows = conn.execute(query).fetchall()
+ return {row[0]: {"duration": row[1], "is_playlist_video": self._is_playlist_video(row[0], conn)} for row in rows}
+ except sqlite3.Error as db_error:
+ log.error("An error occurred while trying to connect to the database: %s", db_error)
+ self.message = f"{self.media_url_link} failed: {db_error}"
+ return {}
+ def _is_playlist_video(self, path, conn):
+ try:
+ return bool(conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone())
+ except sqlite3.Error as db_error:
+ log.error("An error occurred while trying to connect to the database: %s", db_error)
+ return False
+ def _get_shelf_title(self, conn):
+ url_part = self.media_url.split("/")[-1]
+ if "list=" in url_part:
+ url_part = url_part.split("list=")[-1]
+ try:
+ self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (url_part,)).fetchone()[0]
+ except sqlite3.Error as db_error:
+ log.error("An error occurred while trying to connect to the database: %s", db_error)
+ elif "@" in url_part:
+ self.shelf_title = url_part.split("@")[-1]
+ else:
+ self.shelf_title = "Unnamed Bookshelf"
+ def _send_shelf_title(self):
+ try:
+ response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title})
+ if response.status_code == 200:
+ self.shelf_id = response.json()["shelf_id"]
+ else:
+ log.error("Received unexpected status code %s while sending the shelf title to %s", response.status_code, self.original_url)
+ except Exception as e:
+ log.error("An error occurred during the shelf title sending: %s", e)
+ def _add_download_tasks_to_worker(self, requested_urls):
+ for index, requested_url in enumerate(requested_urls.keys()):
+ task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url),
+ requested_url, self.original_url,
+ self.current_user_name, self.shelf_id)
+ WorkerThread.add(self.current_user_name, task_download)
+ num_requested_urls = len(requested_urls)
+ total_duration = sum(url_data["duration"] for url_data in requested_urls.values())
+ self.message = self.media_url_link + f"
Number of Videos: {index + 1}/{num_requested_urls}
Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"
def run(self, worker_thread):
- """Run the metadata fetching task"""
self.worker_thread = worker_thread
log.info("Starting to fetch metadata for URL: %s", self.media_url)
self.start_time = self.end_time = datetime.now()
@@ -41,117 +118,27 @@ def run(self, worker_thread):
self.progress = 0
lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper")
+ subprocess_args = [lb_executable, "tubeadd", self.media_url]
- if self.media_url:
- if "&" in self.media_url:
- self.media_url = self.media_url.split("&")[0]
- subprocess_args = [lb_executable, "tubeadd", self.media_url]
- log.info("Subprocess args: %s", subprocess_args)
+ p = self._execute_subprocess(subprocess_args)
+ if p is None:
+ self.stat = STAT_FAIL
+ return
- # Execute the download process using process_open
- try:
- p = process_open(subprocess_args, newlines=True)
- p.wait()
- self_main_message = f"{self.media_url_link}"
- self.message = self_main_message
- # Database operations
- requested_urls = {}
- with sqlite3.connect(XKLB_DB_FILE) as conn:
- try:
- cursor = conn.execute("PRAGMA table_info(media)")
- self.columns = [column[1] for column in cursor.fetchall()]
- if "error" in self.columns:
- rows = conn.execute("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'").fetchall()
- else:
- rows = conn.execute("SELECT path, duration FROM media WHERE path LIKE 'http%'").fetchall()
- # Abort if there are no urls
- if not rows:
- log.info("No urls found in the database")
- error = conn.execute("SELECT error, webpath FROM media WHERE error IS NOT NULL AND webpath = ?", (self.media_url,)).fetchone()
- if error:
- log.error("[xklb] An error occurred while trying to retrieve the data for %s: %s", error[1], error[0])
- self.progress = 0
- self.message = f"{error[1]} gave no data : {error[0]}"
- return
- for row in rows:
- path = row[0]
- duration = row[1]
- is_playlist_video = False
- if "playlists_id" in self.columns:
- playlist_id = conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone()
- if playlist_id:
- is_playlist_video = True
- requested_urls[path] = {
- "duration": duration,
- "is_playlist_video": is_playlist_video
- }
- except sqlite3.Error as db_error:
- log.error("An error occurred while trying to connect to the database: %s", db_error)
- self.message = f"{self.media_url_link} failed: {db_error}"
- # get the shelf title
- if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]):
- try:
- self.playlist_id = self.media_url.split("/")[-1]
- if "list=" in self.playlist_id:
- self.playlist_id = self.playlist_id.split("list=")[-1]
- self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (self.playlist_id,)).fetchone()[0]
- elif "@" in self.playlist_id:
- self.shelf_title = self.playlist_id.split("@")[-1]
- else:
- self.shelf_title = "Unnamed Bookshelf"
- except sqlite3.Error as db_error:
- if "no such table: playlists" in str(db_error):
- log.info("No playlists table found in the database")
- self.playlist_id = None
- else:
- log.error("An error occurred while trying to connect to the database: %s", db_error)
- self.message = f"{self.media_url_link} failed to download: {db_error}"
- self.progress = 0
- finally:
- log.info("Shelf title: %s", self.shelf_title)
- conn.close()
- if self.shelf_title:
- response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title})
- if response.status_code == 200:
- self.shelf_id = response.json()["shelf_id"]
- else:
- log.error("An error occurred while trying to send the shelf title to %s", self.original_url)
- num_requested_urls = len(requested_urls.keys())
- total_duration = 0
- for index, requested_url in enumerate(requested_urls.keys()):
- task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url),
- requested_url, self.original_url,
- self.current_user_name, self.shelf_id
- )
- WorkerThread.add(self.current_user_name, task_download)
- self.progress = (index + 1) / num_requested_urls
- if requested_urls[requested_url]["duration"] is not None:
- total_duration += requested_urls[requested_url]["duration"]
- self.message = self_main_message + f"
Number of Videos: {index + 1}/{num_requested_urls}
Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"
- except Exception as e:
- log.error("An error occurred during the subprocess execution: %s", e)
- self.message = f"{self.media_url_link} failed: {e}"
- finally:
- if p.returncode == 0 or self.progress == 1.0:
- else:
- self.stat = STAT_FAIL
+ with sqlite3.connect(XKLB_DB_FILE) as conn:
+ requested_urls = self._fetch_requested_urls(conn)
+ if not requested_urls:
+ return
- else:
- log.info("No media URL provided - skipping download task")
+ if self.type_of_url != "video":
+ self._get_shelf_title(conn)
+ if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]):
+ self._send_shelf_title()
+ self._add_download_tasks_to_worker(requested_urls)
+ conn.close()
def name(self):