Skip to content

Commit

Permalink
Update to retrieve key metadata
Browse files Browse the repository at this point in the history
This builds upon iiab#142 to progress towards implementing the top 100 feature
  • Loading branch information
deldesir authored Mar 27, 2024
1 parent 67842b6 commit 3f7e9de
Showing 1 changed file with 119 additions and 114 deletions.
233 changes: 119 additions & 114 deletions cps/tasks/metadata_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,140 +18,145 @@ class TaskMetadataExtract(CalibreTask):
def __init__(self, task_message, media_url, original_url, current_user_name):
super(TaskMetadataExtract, self).__init__(task_message)
self.message = task_message
self.media_url = media_url
self.media_url_link = f'<a href="{media_url}" target="_blank">{media_url}</a>'
# (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax
self.original_url = re.sub(r"/media(?=\?|$)", r"/meta", original_url)
self.media_url = self._format_media_url(media_url)
self.media_url_link = f'<a href="{self.media_url}" target="_blank">{self.media_url}</a>'
self.original_url = self._format_original_url(original_url)
self.type_of_url = self._get_type_of_url(self.media_url)
self.current_user_name = current_user_name
self.start_time = self.end_time = datetime.now()
self.stat = STAT_WAITING
self.progress = 0
self.columns = None
self.shelf_title = None
self.shelf_id = None
self.playlist_id = None
self.main_message = None

def _format_media_url(self, media_url):
return media_url.split("&")[0] if "&" in media_url else media_url

def _format_original_url(self, original_url):
# (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax
return re.sub(r"/media(?=\?|$)", r"/meta", original_url)

def _get_type_of_url(self, media_url):
if "list=" in media_url:
return "playlist"
elif "@" in media_url:
return "channel"
else:
return "video"

def _execute_subprocess(self, subprocess_args):
try:
p = process_open(subprocess_args, newlines=True)
p.wait()
self.message = self.media_url_link + "..."
return p
except Exception as e:
log.error("An error occurred during subprocess execution: %s", e)
self.message = f"{self.media_url_link} failed: {e}"
return None

def _fetch_requested_urls(self, conn):
try:
cursor = conn.execute("PRAGMA table_info(media)")
self.columns = [column[1] for column in cursor.fetchall()]
query = ("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'"
if "error" in self.columns
else "SELECT path, duration FROM media WHERE path LIKE 'http%'")
rows = conn.execute(query).fetchall()
return {row[0]: {"duration": row[1], "is_playlist_video": self._is_playlist_video(row[0], conn)} for row in rows}
except sqlite3.Error as db_error:
log.error("An error occurred while trying to connect to the database: %s", db_error)
self.message = f"{self.media_url_link} failed: {db_error}"
return {}

def _is_playlist_video(self, path, conn):
try:
return bool(conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone())
except sqlite3.Error as db_error:
log.error("An error occurred while trying to connect to the database: %s", db_error)
return False

def _get_shelf_title(self, conn):
url_part = self.media_url.split("/")[-1]
if "list=" in url_part:
url_part = url_part.split("list=")[-1]
try:
self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (url_part,)).fetchone()[0]
except sqlite3.Error as db_error:
log.error("An error occurred while trying to connect to the database: %s", db_error)
elif "@" in url_part:
self.shelf_title = url_part.split("@")[-1]
else:
self.shelf_title = "Unnamed Bookshelf"

def _send_shelf_title(self):
try:
response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title})
if response.status_code == 200:
self.shelf_id = response.json()["shelf_id"]
else:
log.error("Received unexpected status code %s while sending the shelf title to %s", response.status_code, self.original_url)
except Exception as e:
log.error("An error occurred during the shelf title sending: %s", e)

def _update_metadata(self, requested_urls):
failed_urls = []
subprocess_args_list = [[os.getenv("LB_WRAPPER", "lb-wrapper"), "tubeadd", requested_url] for requested_url in requested_urls.keys()]

for index, subprocess_args in enumerate(subprocess_args_list):
try:
p = self._execute_subprocess(subprocess_args)
if p is not None:
self.progress = (index + 1) / len(subprocess_args_list)
else:
failed_urls.append(subprocess_args[2])
p.wait()
except Exception as e:
log.error("An error occurred during updating the metadata of %s: %s", subprocess_args[2], e)
self.message = f"{subprocess_args[2]} failed: {e}"
failed_urls.append(subprocess_args[2])

def _add_download_tasks_to_worker(self, requested_urls):
for index, requested_url in enumerate(requested_urls.keys()):
task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url),
requested_url, self.original_url,
self.current_user_name, self.shelf_id)
WorkerThread.add(self.current_user_name, task_download)
num_requested_urls = len(requested_urls)
total_duration = sum(url_data["duration"] for url_data in requested_urls.values())
self.message = self.media_url_link + f"<br><br>Number of Videos: {index + 1}/{num_requested_urls}<br>Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"

def run(self, worker_thread):
"""Run the metadata fetching task"""
self.worker_thread = worker_thread
log.info("Starting to fetch metadata for URL: %s", self.media_url)
self.start_time = self.end_time = datetime.now()
self.stat = STAT_STARTED
self.progress = 0

lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper")
subprocess_args = [lb_executable, "tubeadd", self.media_url]

if self.media_url:
if "&" in self.media_url:
self.media_url = self.media_url.split("&")[0]
subprocess_args = [lb_executable, "tubeadd", self.media_url]
log.info("Subprocess args: %s", subprocess_args)
p = self._execute_subprocess(subprocess_args)
if p is None:
self.stat = STAT_FAIL
return

# Execute the download process using process_open
try:
p = process_open(subprocess_args, newlines=True)

p.wait()
self_main_message = f"{self.media_url_link}"
self.message = self_main_message

# Database operations
requested_urls = {}
with sqlite3.connect(XKLB_DB_FILE) as conn:
try:
cursor = conn.execute("PRAGMA table_info(media)")
self.columns = [column[1] for column in cursor.fetchall()]
if "error" in self.columns:
rows = conn.execute("SELECT path, duration FROM media WHERE error IS NULL AND path LIKE 'http%'").fetchall()
else:
rows = conn.execute("SELECT path, duration FROM media WHERE path LIKE 'http%'").fetchall()

# Abort if there are no urls
if not rows:
log.info("No urls found in the database")
error = conn.execute("SELECT error, webpath FROM media WHERE error IS NOT NULL AND webpath = ?", (self.media_url,)).fetchone()
if error:
log.error("[xklb] An error occurred while trying to retrieve the data for %s: %s", error[1], error[0])
self.progress = 0
self.message = f"{error[1]} gave no data : {error[0]}"
return

for row in rows:
path = row[0]
duration = row[1]
is_playlist_video = False
if "playlists_id" in self.columns:
playlist_id = conn.execute("SELECT playlists_id FROM media WHERE path = ?", (path,)).fetchone()
if playlist_id:
is_playlist_video = True
requested_urls[path] = {
"duration": duration,
"is_playlist_video": is_playlist_video
}

except sqlite3.Error as db_error:
log.error("An error occurred while trying to connect to the database: %s", db_error)
self.message = f"{self.media_url_link} failed: {db_error}"

# get the shelf title
if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]):
try:
self.playlist_id = self.media_url.split("/")[-1]
if "list=" in self.playlist_id:
self.playlist_id = self.playlist_id.split("list=")[-1]
self.shelf_title = conn.execute("SELECT title FROM playlists WHERE extractor_playlist_id = ?", (self.playlist_id,)).fetchone()[0]
elif "@" in self.playlist_id:
self.shelf_title = self.playlist_id.split("@")[-1]
else:
self.shelf_title = "Unnamed Bookshelf"
except sqlite3.Error as db_error:
if "no such table: playlists" in str(db_error):
log.info("No playlists table found in the database")
self.playlist_id = None
else:
log.error("An error occurred while trying to connect to the database: %s", db_error)
self.message = f"{self.media_url_link} failed to download: {db_error}"
self.progress = 0
finally:
log.info("Shelf title: %s", self.shelf_title)

conn.close()

if self.shelf_title:
response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title})
if response.status_code == 200:
self.shelf_id = response.json()["shelf_id"]
else:
log.error("An error occurred while trying to send the shelf title to %s", self.original_url)

num_requested_urls = len(requested_urls.keys())
total_duration = 0

for index, requested_url in enumerate(requested_urls.keys()):
task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url),
requested_url, self.original_url,
self.current_user_name, self.shelf_id
)
WorkerThread.add(self.current_user_name, task_download)

self.progress = (index + 1) / num_requested_urls
if requested_urls[requested_url]["duration"] is not None:
total_duration += requested_urls[requested_url]["duration"]
self.message = self_main_message + f"<br><br>Number of Videos: {index + 1}/{num_requested_urls}<br>Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"
with sqlite3.connect(XKLB_DB_FILE) as conn:
requested_urls = self._fetch_requested_urls(conn)
if not requested_urls:
return

except Exception as e:
log.error("An error occurred during the subprocess execution: %s", e)
self.message = f"{self.media_url_link} failed: {e}"
if self.type_of_url != "video":
self._get_shelf_title(conn)
if any([requested_urls[url]["is_playlist_video"] for url in requested_urls.keys()]):
self._send_shelf_title()
self._update_metadata(requested_urls)

finally:
if p.returncode == 0 or self.progress == 1.0:
self.stat = STAT_FINISH_SUCCESS
else:
self.stat = STAT_FAIL
self._add_download_tasks_to_worker(requested_urls)
conn.close()

else:
log.info("No media URL provided - skipping download task")
self.stat = STAT_FINISH_SUCCESS

@property
def name(self):
Expand All @@ -162,4 +167,4 @@ def __str__(self):

@property
def is_cancellable(self):
return True # Change to True if the download task should be cancellable
return True

0 comments on commit 3f7e9de

Please sign in to comment.