diff --git a/Dockerfile b/Dockerfile index 7e3d6040c1..0ab02601ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -153,6 +153,12 @@ COPY ./docker/services/spotify_reader/spotify_reader.service /etc/service/spotif COPY ./docker/services/spotify_reader/spotify_reader.finish /etc/service/spotify_reader/finish RUN touch /etc/service/spotify_reader/down +# Last.fm importer +COPY ./docker/services/lastfm_importer/consul-template-lastfm-importer.conf /etc/consul-template-lastfm-importer.conf +COPY ./docker/services/lastfm_importer/lastfm_importer.service /etc/service/lastfm_importer/run +COPY ./docker/services/lastfm_importer/lastfm_importer.finish /etc/service/lastfm_importer/finish +RUN touch /etc/service/lastfm_importer/down + # Timescale writer COPY ./docker/services/timescale_writer/consul-template-timescale-writer.conf /etc/consul-template-timescale-writer.conf COPY ./docker/services/timescale_writer/timescale_writer.service /etc/service/timescale_writer/run diff --git a/admin/sql/create_tables.sql b/admin/sql/create_tables.sql index ce21f0f34e..fa3ad2bab7 100644 --- a/admin/sql/create_tables.sql +++ b/admin/sql/create_tables.sql @@ -126,7 +126,7 @@ CREATE TABLE external_service_oauth ( user_id INTEGER NOT NULL, -- FK to "user".id external_user_id TEXT, service external_service_oauth_type NOT NULL, - access_token TEXT NOT NULL, + access_token TEXT, refresh_token TEXT, token_expires TIMESTAMP WITH TIME ZONE, last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(), diff --git a/admin/sql/updates/2024-08-02-make-external-service-access-token-nullable.sql b/admin/sql/updates/2024-08-02-make-external-service-access-token-nullable.sql new file mode 100644 index 0000000000..e087bc95e6 --- /dev/null +++ b/admin/sql/updates/2024-08-02-make-external-service-access-token-nullable.sql @@ -0,0 +1,5 @@ +BEGIN; + +ALTER TABLE external_service_oauth ALTER COLUMN access_token DROP NOT NULL; + +COMMIT; diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 1bb8584df0..191dd9af0e 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -106,13 +106,24 @@ services: image: web volumes: - ..:/code/listenbrainz:z - command: python3 -m "listenbrainz.spotify_updater.spotify_read_listens" + command: python3 -m "listenbrainz.listens_importer.spotify" user: "${LB_DOCKER_USER:-root}:${LB_DOCKER_GROUP:-root}" depends_on: - redis - lb_db - rabbitmq + lastfm_importer: + image: web + volumes: + - ..:/code/listenbrainz:z + command: python3 -m "listenbrainz.listens_importer.lastfm" + user: "${LB_DOCKER_USER:-root}:${LB_DOCKER_GROUP:-root}" + depends_on: + - redis + - rabbitmq + - lb_db + websockets: image: web volumes: diff --git a/docker/rc.local b/docker/rc.local index da86833086..c7127bc8cc 100755 --- a/docker/rc.local +++ b/docker/rc.local @@ -43,6 +43,13 @@ then rm -f /etc/service/spotify_reader/down fi +if [ "${CONTAINER_NAME}" = "listenbrainz-lastfm-reader-${DEPLOY_ENV}" ] +then + log Enabling last.fm importer + rm -f /etc/service/lastfm_importer/down +fi + + if [ "${CONTAINER_NAME}" = "listenbrainz-spark-reader-${DEPLOY_ENV}" ] then log Enabling spark reader diff --git a/docker/services/lastfm_importer/consul-template-lastfm-importer.conf b/docker/services/lastfm_importer/consul-template-lastfm-importer.conf new file mode 100644 index 0000000000..0806b3b658 --- /dev/null +++ b/docker/services/lastfm_importer/consul-template-lastfm-importer.conf @@ -0,0 +1,12 @@ +template { + source = "/code/listenbrainz/consul_config.py.ctmpl" + destination = "/code/listenbrainz/listenbrainz/config.py" +} + +exec { + command = ["run-lb-command", "python3", "-m", "listenbrainz.listens_importer.lastfm"] + splay = "5s" + reload_signal = "SIGHUP" + kill_signal = "SIGTERM" + kill_timeout = "30s" +} diff --git a/docker/services/lastfm_importer/lastfm_importer.finish b/docker/services/lastfm_importer/lastfm_importer.finish new file mode 100755 index 0000000000..4fae09302e --- /dev/null +++ b/docker/services/lastfm_importer/lastfm_importer.finish @@ -0,0 +1,17 @@ +#!/bin/bash + +export service="lastfm-importer" + +. /etc/lb-startup-common.sh + + +generate_message "$service" "$@" + +log "$message" + +send_sentry_message "$message" + +if [ "$1" != "0" ]; then + log "Exited with non-0 status, sleeping 10 seconds" + sleep 10 +fi diff --git a/docker/services/lastfm_importer/lastfm_importer.service b/docker/services/lastfm_importer/lastfm_importer.service new file mode 100755 index 0000000000..fa7a46135a --- /dev/null +++ b/docker/services/lastfm_importer/lastfm_importer.service @@ -0,0 +1,4 @@ +#!/bin/bash + +sleep 1 +exec run-consul-template -config /etc/consul-template-lastfm-importer.conf diff --git a/docker/services/spotify_reader/consul-template-spotify-reader.conf b/docker/services/spotify_reader/consul-template-spotify-reader.conf index d823467cfa..f7776b4f87 100644 --- a/docker/services/spotify_reader/consul-template-spotify-reader.conf +++ b/docker/services/spotify_reader/consul-template-spotify-reader.conf @@ -4,7 +4,7 @@ template { } exec { - command = ["run-lb-command", "python3", "-m", "listenbrainz.spotify_updater.spotify_read_listens"] + command = ["run-lb-command", "python3", "-m", "listenbrainz.listens_importer.spotify"] splay = "5s" reload_signal = "SIGHUP" kill_signal = "SIGTERM" diff --git a/frontend/css/music-services.less b/frontend/css/music-services.less index 472ca4f243..7d2a5fe400 100644 --- a/frontend/css/music-services.less +++ b/frontend/css/music-services.less @@ -1,6 +1,11 @@ @checkbox-size: 40px; .music-service-selection { + button.music-service-option{ + border: none; + background: none; + padding: 0; + } .music-service-option { input[type="radio"] { display: none; diff --git a/frontend/js/src/settings/music-services/details/MusicServices.tsx b/frontend/js/src/settings/music-services/details/MusicServices.tsx index cf80cc0c81..48bf382ea5 100644 --- a/frontend/js/src/settings/music-services/details/MusicServices.tsx +++ b/frontend/js/src/settings/music-services/details/MusicServices.tsx @@ -18,6 +18,7 @@ type MusicServicesLoaderData = { current_critiquebrainz_permissions: string; current_soundcloud_permissions: string; current_apple_permissions: string; + current_lastfm_permissions: string; }; export default function MusicServices() { @@ -34,6 +35,7 @@ export default function MusicServices() { critiquebrainz: loaderData.current_critiquebrainz_permissions, soundcloud: loaderData.current_soundcloud_permissions, appleMusic: loaderData.current_apple_permissions, + lastFm: loaderData.current_lastfm_permissions, }); const handlePermissionChange = async ( @@ -161,6 +163,56 @@ export default function MusicServices() { } }; + const handleConnectToLaftFM = async ( + evt: React.FormEvent + ) => { + evt.preventDefault(); + const formData = new FormData(evt.currentTarget); + const username = formData.get("lastfmUsername"); + const startdate = formData.get("lastFMStartDatetime"); + try { + const response = await fetch(`/settings/music-services/lastfm/connect/`, { + method: "POST", + body: JSON.stringify({ + external_user_id: username, + latest_listened_at: startdate || null, + }), + headers: { + "Content-Type": "application/json", + }, + }); + + if (response.ok) { + toast.success( + + ); + + setPermissions((prevState) => ({ + ...prevState, + lastfm: "import", + })); + } else { + if (response.bodyUsed) { + const body = await response.json(); + if (body.error) { + throw body.error; + } + } + throw response.statusText; + } + } catch (error) { + toast.error( + + ); + } + }; + return ( <> @@ -290,6 +342,84 @@ export default function MusicServices() { +
+
+

Last.FM

+
+
+

+ Connect to your Last.FM account to automatically add your + scrobbles to your ListenBrainz listens. +

+

+ You must first disable the "Hide recent listening + information" setting in your Last.fm{" "} + + privacy settings + + . +

+
+
+
+ + +
+
+ + +
+
+
+
+ + +
+
+
+
+

SoundCloud

diff --git a/frontend/js/src/settings/music-services/details/components/ExternalServiceButton.tsx b/frontend/js/src/settings/music-services/details/components/ExternalServiceButton.tsx index 7c5a865c43..b2592ff102 100644 --- a/frontend/js/src/settings/music-services/details/components/ExternalServiceButton.tsx +++ b/frontend/js/src/settings/music-services/details/components/ExternalServiceButton.tsx @@ -1,7 +1,12 @@ import * as React from "react"; type ExternalServiceButtonProps = { - service: "spotify" | "soundcloud" | "critiquebrainz" | "appleMusic"; + service: + | "spotify" + | "soundcloud" + | "critiquebrainz" + | "appleMusic" + | "lastfm"; current: string; value: string; title: string; diff --git a/listenbrainz/db/external_service_oauth.py b/listenbrainz/db/external_service_oauth.py index b148165015..16722d89d7 100644 --- a/listenbrainz/db/external_service_oauth.py +++ b/listenbrainz/db/external_service_oauth.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List, Optional, Union from sqlalchemy import text @@ -7,8 +8,9 @@ import sqlalchemy -def save_token(db_conn, user_id: int, service: ExternalServiceType, access_token: str, refresh_token: Optional[str], - token_expires_ts: int, record_listens: bool, scopes: List[str], external_user_id: Optional[str] = None): +def save_token(db_conn, user_id: int, service: ExternalServiceType, access_token: Optional[str], refresh_token: Optional[str], + token_expires_ts: Optional[int], record_listens: bool, scopes: Optional[List[str]], external_user_id: Optional[str] = None, + latest_listened_at: Optional[datetime] = None): """ Add a row to the external_service_oauth table for specified user with corresponding tokens and information. Args: @@ -21,6 +23,7 @@ def save_token(db_conn, user_id: int, service: ExternalServiceType, access_token record_listens: True if user wishes to import listens, False otherwise scopes: the oauth scopes external_user_id: the user's id in the external linked service + latest_listened_at: last listen import time """ # regardless of whether a row is inserted or updated, the end result of the query # should remain the same. if not so, weird things can happen as it is likely we @@ -29,7 +32,7 @@ def save_token(db_conn, user_id: int, service: ExternalServiceType, access_token # to use the new values. any column which does not have a new value to be set should # be explicitly set to the default value (which would have been used if the row was # inserted instead). - token_expires = utils.unix_timestamp_to_datetime(token_expires_ts) + token_expires = utils.unix_timestamp_to_datetime(token_expires_ts) if token_expires_ts else None result = db_conn.execute(sqlalchemy.text(""" INSERT INTO external_service_oauth AS eso (user_id, external_user_id, service, access_token, refresh_token, token_expires, scopes) @@ -58,20 +61,21 @@ def save_token(db_conn, user_id: int, service: ExternalServiceType, access_token external_service_oauth_id = result.fetchone().id db_conn.execute(sqlalchemy.text(""" INSERT INTO listens_importer - (external_service_oauth_id, user_id, service) + (external_service_oauth_id, user_id, service, latest_listened_at) VALUES - (:external_service_oauth_id, :user_id, :service) + (:external_service_oauth_id, :user_id, :service, :latest_listened_at) ON CONFLICT (user_id, service) DO UPDATE SET external_service_oauth_id = EXCLUDED.external_service_oauth_id, user_id = EXCLUDED.user_id, service = EXCLUDED.service, last_updated = NULL, - latest_listened_at = NULL, + latest_listened_at = EXCLUDED.latest_listened_at, error_message = NULL """), { "external_service_oauth_id": external_service_oauth_id, "user_id": user_id, - "service": service.value + "service": service.value, + "latest_listened_at": latest_listened_at, }) db_conn.commit() diff --git a/listenbrainz/db/listens_importer.py b/listenbrainz/db/listens_importer.py index fd2f80e990..5711146577 100644 --- a/listenbrainz/db/listens_importer.py +++ b/listenbrainz/db/listens_importer.py @@ -1,6 +1,8 @@ import datetime from typing import Optional, Union +from sqlalchemy import text + from data.model.external_service import ExternalServiceType from listenbrainz import utils import sqlalchemy @@ -76,3 +78,36 @@ def get_latest_listened_at(db_conn, user_id: int, service: ExternalServiceType) }) row = result.fetchone() return row.latest_listened_at if row else None + + +def get_active_users_to_process(db_conn, service, include_error=False) -> list[dict]: + """ Returns a list of users whose listens should be imported from the external service. + """ + filters = ["external_service_oauth.service = :service"] + if include_error: + filters.append("error_message IS NULL") + filter_str = " AND ".join(filters) + + result = db_conn.execute(text(f""" + SELECT external_service_oauth.user_id + , "user".musicbrainz_id + , "user".musicbrainz_row_id + , access_token + , refresh_token + , listens_importer.last_updated + , token_expires + , scopes + , latest_listened_at + , external_service_oauth.external_user_id + , error_message + FROM external_service_oauth + JOIN "user" + ON "user".id = external_service_oauth.user_id + JOIN listens_importer + ON listens_importer.external_service_oauth_id = external_service_oauth.id + WHERE {filter_str} + ORDER BY latest_listened_at DESC NULLS LAST + """), {"service": service.value}) + users = [row for row in result.mappings()] + db_conn.rollback() + return users diff --git a/listenbrainz/db/spotify.py b/listenbrainz/db/spotify.py index b7b25b9124..3bdc9ac2b8 100644 --- a/listenbrainz/db/spotify.py +++ b/listenbrainz/db/spotify.py @@ -4,32 +4,6 @@ import sqlalchemy -def get_active_users_to_process(db_conn) -> List[dict]: - """ Returns a list of users whose listens should be imported from Spotify. - """ - result = db_conn.execute(sqlalchemy.text(""" - SELECT external_service_oauth.user_id - , "user".musicbrainz_id - , "user".musicbrainz_row_id - , access_token - , refresh_token - , listens_importer.last_updated - , token_expires - , scopes - , latest_listened_at - , error_message - FROM external_service_oauth - JOIN "user" - ON "user".id = external_service_oauth.user_id - JOIN listens_importer - ON listens_importer.external_service_oauth_id = external_service_oauth.id - WHERE external_service_oauth.service = 'spotify' - AND error_message IS NULL - ORDER BY latest_listened_at DESC NULLS LAST - """)) - return [row for row in result.mappings()] - - def get_user_import_details(db_conn, user_id: int) -> Optional[dict]: """ Return user's spotify linking details to display on connect services page diff --git a/listenbrainz/domain/importer_service.py b/listenbrainz/domain/importer_service.py index d0767857e2..6293ad78e4 100644 --- a/listenbrainz/domain/importer_service.py +++ b/listenbrainz/domain/importer_service.py @@ -1,5 +1,7 @@ from abc import ABC -from typing import List, Union +from typing import Union + +from flask import current_app from listenbrainz.domain.external_service import ExternalService, ExternalServiceError from listenbrainz.db import listens_importer @@ -10,9 +12,9 @@ class ImporterService(ExternalService, ABC): """ Base class that external music services which also allow to import listen history to ListenBrainz should implement.""" - def get_active_users_to_process(self) -> List[dict]: + def get_active_users_to_process(self, include_error=False) -> list[dict]: """ Return list of active users for importing listens. """ - raise NotImplementedError() + return listens_importer.get_active_users_to_process(db_conn, self.service, include_error) def update_user_import_status(self, user_id: int, error: str = None): """ Update the last_update field for user with specified user ID. @@ -32,6 +34,7 @@ def update_latest_listen_ts(self, user_id: int, timestamp: Union[int, float]): user_id: the ListenBrainz row ID of the user timestamp: the unix timestamp of the latest listen imported for the user """ + current_app.logger.info(f"Updating latest_listen_ts for user {user_id}, service {self.service}, timestamp {timestamp}") listens_importer.update_latest_listened_at(db_conn, user_id, self.service, timestamp) diff --git a/listenbrainz/domain/lastfm.py b/listenbrainz/domain/lastfm.py index 4add6040bf..ee37663d72 100644 --- a/listenbrainz/domain/lastfm.py +++ b/listenbrainz/domain/lastfm.py @@ -8,6 +8,9 @@ from brainzutils import musicbrainz_db +from data.model.external_service import ExternalServiceType +from listenbrainz.db import external_service_oauth +from listenbrainz.domain.importer_service import ImporterService from listenbrainz.webserver import db_conn from listenbrainz.webserver.errors import APINotFound @@ -138,3 +141,17 @@ def import_feedback(user_id: int, lfm_user: str) -> dict: counts["inserted"] = len(recording_feedback) return counts + + +class LastfmService(ImporterService): + + def __init__(self): + super(LastfmService, self).__init__(ExternalServiceType.LASTFM) + + def add_new_user(self, user_id: int, token: dict) -> bool: + external_service_oauth.save_token( + db_conn, user_id=user_id, service=self.service, access_token=None, refresh_token=None, + token_expires_ts=None, record_listens=True, scopes=[], external_user_id=token["external_user_id"], + latest_listened_at=token["latest_listened_at"] + ) + return True diff --git a/listenbrainz/domain/spotify.py b/listenbrainz/domain/spotify.py index 37bc5b6cea..4019941d62 100644 --- a/listenbrainz/domain/spotify.py +++ b/listenbrainz/domain/spotify.py @@ -194,8 +194,3 @@ def date_to_iso(date): user['latest_listened_at_iso'] = date_to_iso(user['latest_listened_at']) user['last_updated_iso'] = date_to_iso(user['last_updated']) return user - - def get_active_users_to_process(self): - """ Returns a list of Spotify user instances that need their Spotify listens imported. - """ - return spotify.get_active_users_to_process(db_conn) diff --git a/listenbrainz/spotify_updater/__init__.py b/listenbrainz/listens_importer/__init__.py similarity index 100% rename from listenbrainz/spotify_updater/__init__.py rename to listenbrainz/listens_importer/__init__.py diff --git a/listenbrainz/listens_importer/base.py b/listenbrainz/listens_importer/base.py new file mode 100644 index 0000000000..e88396daaf --- /dev/null +++ b/listenbrainz/listens_importer/base.py @@ -0,0 +1,178 @@ +import abc +import time +from abc import abstractmethod + +from brainzutils import metrics +from flask import current_app, render_template +from psycopg2 import DatabaseError +from sqlalchemy.exc import SQLAlchemyError +from werkzeug.exceptions import InternalServerError, ServiceUnavailable + +from brainzutils.mail import send_mail + +from listenbrainz.db.exceptions import DatabaseException +from listenbrainz.domain.external_service import ExternalServiceError +from listenbrainz.webserver.errors import ListenValidationError +from listenbrainz.webserver.models import SubmitListenUserMetadata +from listenbrainz.webserver.views.api_tools import LISTEN_TYPE_IMPORT, insert_payload, validate_listen, \ + LISTEN_TYPE_SINGLE, LISTEN_TYPE_PLAYING_NOW + +from listenbrainz.db import user as db_user +import listenbrainz + +METRIC_UPDATE_INTERVAL = 60 # seconds + + +class ListensImporter(abc.ABC): + + def __init__(self, name, user_friendly_name, service): + self.name = name + self.user_friendly_name = user_friendly_name + self.service = service + # number of listens imported since last metric update was submitted + self._listens_imported_since_last_update = 0 + self._metric_submission_time = time.monotonic() + METRIC_UPDATE_INTERVAL + self.include_error = False + + def notify_error(self, musicbrainz_id: str, error: str): + """ Notifies specified user via email about error during Spotify import. + + Args: + musicbrainz_id: the MusicBrainz ID of the user + error: a description of the error encountered. + """ + user_email = db_user.get_by_mb_id(listenbrainz.webserver.db_conn, musicbrainz_id, fetch_email=True)["email"] + if not user_email: + return + + link = current_app.config['SERVER_ROOT_URL'] + '/settings/music-services/details/' + text = render_template('emails/listens_importer_error.txt', error=error, link=link) + send_mail( + subject=f'ListenBrainz {self.user_friendly_name} Importer Error', + text=text, + recipients=[user_email], + from_name='ListenBrainz', + from_addr='noreply@' + current_app.config['MAIL_FROM_DOMAIN'], + ) + + def parse_and_validate_listen_items(self, converter, items): + """ Converts and validates the listens received from the external service API. + + Args: + converter: a function to parse the incoming items that returns a tuple of (listen, listen_type) + items: a list of listen events received from the external + + Returns: + tuple of (now playing listen, a list of recent listens to submit to ListenBrainz, timestamp of latest listen) + """ + now_playing_listen = None + listens = [] + latest_listen_ts = None + + for item in items: + listen, listen_type = converter(item) + + if listen_type == LISTEN_TYPE_IMPORT and \ + (latest_listen_ts is None or listen['listened_at'] > latest_listen_ts): + latest_listen_ts = listen['listened_at'] + + try: + validate_listen(listen, listen_type) + if listen_type == LISTEN_TYPE_IMPORT or listen_type == LISTEN_TYPE_SINGLE: + listens.append(listen) + + # set the first now playing listen to now_playing and ignore the rest + if listen_type == LISTEN_TYPE_PLAYING_NOW and now_playing_listen is None: + now_playing_listen = listen + except ListenValidationError: + pass + return now_playing_listen, listens, latest_listen_ts + + def submit_listens_to_listenbrainz(self, user: dict, listens: list[dict], listen_type=LISTEN_TYPE_IMPORT): + """ Submit a batch of listens to ListenBrainz + + Args: + user: the user whose listens are to be submitted, dict should contain + at least musicbrainz_id and user_id + listens: a list of listens to be submitted + listen_type: the type of listen (single, import, playing_now) + """ + username = user['musicbrainz_id'] + user_metadata = SubmitListenUserMetadata(user_id=user['user_id'], musicbrainz_id=username) + retries = 10 + while retries >= 0: + try: + current_app.logger.debug('Submitting %d listens for user %s', len(listens), username) + insert_payload(listens, user_metadata, listen_type=listen_type) + current_app.logger.debug('Submitted!') + break + except (InternalServerError, ServiceUnavailable) as e: + retries -= 1 + current_app.logger.error('ISE while trying to import listens for %s: %s', username, str(e)) + if retries == 0: + raise ExternalServiceError('ISE while trying to import listens: %s', str(e)) + + @abstractmethod + def process_one_user(self, user): + pass + + def process_all_users(self): + """ Get a batch of users to be processed and import their listens. + + Returns: + (success, failure) where + success: the number of users whose plays were successfully imported. + failure: the number of users for whom we faced errors while importing. + """ + try: + users = self.service.get_active_users_to_process(self.include_error) + except DatabaseException as e: + listenbrainz.webserver.db_conn.rollback() + current_app.logger.error('Cannot get list of users due to error %s', str(e), exc_info=True) + return 0, 0 + + if not users: + return 0, 0 + + current_app.logger.info('Process %d users...' % len(users)) + success = 0 + failure = 0 + for user in users: + try: + self._listens_imported_since_last_update += self.process_one_user(user) + success += 1 + except (DatabaseException, DatabaseError, SQLAlchemyError): + listenbrainz.webserver.db_conn.rollback() + current_app.logger.error(f'{self.name} could not import listens for user %s:', + user['musicbrainz_id'], exc_info=True) + except Exception: + current_app.logger.error(f'{self.name} could not import listens for user %s:', + user['musicbrainz_id'], exc_info=True) + failure += 1 + + if time.monotonic() > self._metric_submission_time: + self._metric_submission_time += METRIC_UPDATE_INTERVAL + metrics.set(self.name, imported_listens=self._listens_imported_since_last_update) + _listens_imported_since_last_update = 0 + + current_app.logger.info('Processed %d users successfully!', success) + current_app.logger.info('Encountered errors while processing %d users.', failure) + return success, failure + + def main(self): + current_app.logger.info(f'{self.name} started...') + while True: + t = time.monotonic() + success, failure = self.process_all_users() + total_users = success + failure + if total_users > 0: + total_time = time.monotonic() - t + avg_time = total_time / total_users + metrics.set(self.name, + users_processed=total_users, + time_to_process_all_users=total_time, + time_to_process_one_user=avg_time) + current_app.logger.info('All %d users in batch have been processed.', total_users) + current_app.logger.info('Total time taken: %.2f s, average time per user: %.2f s.', total_time, + avg_time) + time.sleep(10) diff --git a/listenbrainz/listens_importer/lastfm.py b/listenbrainz/listens_importer/lastfm.py new file mode 100644 index 0000000000..1d6f507c59 --- /dev/null +++ b/listenbrainz/listens_importer/lastfm.py @@ -0,0 +1,162 @@ +#!/usr/bin/python3 + +import requests +from flask import current_app +from requests.adapters import HTTPAdapter +from urllib3 import Retry + +from listenbrainz.domain.external_service import ExternalServiceError, ExternalServiceAPIError +from listenbrainz.domain.lastfm import LastfmService +from listenbrainz.listens_importer.base import ListensImporter +from listenbrainz.listenstore import LISTEN_MINIMUM_DATE +from listenbrainz.webserver import create_app +from listenbrainz.webserver.views.api_tools import LISTEN_TYPE_IMPORT, \ + LISTEN_TYPE_PLAYING_NOW + + +class LastfmImporter(ListensImporter): + + def __init__(self): + super(LastfmImporter, self).__init__( + name='LastfmImporter', + user_friendly_name="Last.fm", + service=LastfmService(), + ) + + @staticmethod + def convert_scrobble_to_listen(scrobble): + """ Converts data retrieved from the last.fm API into a listen. """ + track_name = scrobble.get("name") + track_mbid = scrobble.get("mbid") + + artist = scrobble.get("artist", {}) + artist_name = artist.get("#text") + artist_mbid = artist.get("mbid") + + album = scrobble.get("album") + album_name = album.get("#text") + album_mbid = album.get("mbid") + + if "date" in scrobble: + listened_at = int(scrobble["date"]["uts"]) + listen_type = LISTEN_TYPE_IMPORT + listen = {"listened_at": listened_at} + else: + # todo: check now playing @attr + listen_type = LISTEN_TYPE_PLAYING_NOW + listen = {} + + if not track_name or not artist_name: + return None, None + + track_metadata = { + "artist_name": artist_name, + "track_name": track_name, + } + if album_name: + track_metadata["release_name"] = album_name + + additional_info = { + "submission_client": "ListenBrainz lastfm importer v2" + } + if track_mbid: + additional_info["lastfm_track_mbid"] = track_mbid + if artist_mbid: + additional_info["lastfm_artist_mbid"] = artist_mbid + if album_mbid: + additional_info["lastfm_release_mbid"] = album_mbid + + if additional_info: + track_metadata["additional_info"] = additional_info + + listen["track_metadata"] = track_metadata + return listen, listen_type + + + def get_user_recent_tracks(self, session, user, page): + """ Get user’s recently played tracks from last.fm api. """ + latest_listened_at = user["latest_listened_at"] or LISTEN_MINIMUM_DATE + params = { + "method": "user.getrecenttracks", + "format": "json", + "api_key": current_app.config["LASTFM_API_KEY"], + "limit": 200, + "user": user["external_user_id"], + "from": int(latest_listened_at.timestamp()), + "page": page + } + response = session.get(current_app.config["LASTFM_API_URL"], params=params) + match response.status_code: + case 200: + return response.json() + case 404: + raise ExternalServiceError("Last.FM user with username %s not found" % (params["user"],)) + case 429: + raise ExternalServiceError("Encountered a rate limit.") + case _: + raise ExternalServiceAPIError('Error from the lastfm API while getting listens: %s' % (str(response.text),)) + + + def process_one_user(self, user: dict) -> int: + """ Get recently played songs for this user and submit them to ListenBrainz. + + Returns: + the number of recently played listens imported for the user + """ + try: + imported_listen_count = 0 + session = requests.Session() + session.mount( + "https://", + HTTPAdapter(max_retries=Retry( + total=3, + backoff_factor=1, + allowed_methods=["GET"], + # retry on 400 because last.fm wraps some service errors in 400 errors + status_forcelist=[400, 413, 429, 500, 502, 503, 504] + )) + ) + + response = self.get_user_recent_tracks(session, user, page=1) + pages = int(response["recenttracks"]["@attr"]["totalPages"]) + + for page in range(pages, 0, -1): + current_app.logger.info("Processing page %s", page) + response = self.get_user_recent_tracks(session, user, page) + now_playing_listen, listens, latest_listened_at = self.parse_and_validate_listen_items( + self.convert_scrobble_to_listen, + response["recenttracks"]["track"] + ) + + if now_playing_listen is not None: + self.submit_listens_to_listenbrainz(user, [now_playing_listen], listen_type=LISTEN_TYPE_PLAYING_NOW) + current_app.logger.info('imported now playing listen for %s' % (str(user['musicbrainz_id']),)) + imported_listen_count += 1 + + if listens: + self.submit_listens_to_listenbrainz(user, listens, listen_type=LISTEN_TYPE_IMPORT) + self.service.update_latest_listen_ts(user['user_id'], latest_listened_at) + current_app.logger.info('imported %d listens for %s' % (len(listens), str(user['musicbrainz_id']))) + imported_listen_count += len(listens) + + return imported_listen_count + except ExternalServiceAPIError as e: + # if it is an error from the Spotify API, show the error message to the user + self.service.update_user_import_status(user_id=user['user_id'], error=str(e)) + if not current_app.config['TESTING']: + self.notify_error(user['musicbrainz_id'], str(e)) + raise e + + def process_all_users(self): + # todo: last.fm is prone to errors, especially for entire history imports. currently doing alternate passes + # where we ignore and reattempt + result = super().process_all_users() + self.include_error = not self.include_error + return result + + +if __name__ == '__main__': + app = create_app() + with app.app_context(): + importer = LastfmImporter() + importer.main() diff --git a/listenbrainz/listens_importer/spotify.py b/listenbrainz/listens_importer/spotify.py new file mode 100644 index 0000000000..ba508715d6 --- /dev/null +++ b/listenbrainz/listens_importer/spotify.py @@ -0,0 +1,294 @@ +#!/usr/bin/python3 + +import time + +import spotipy +from dateutil import parser +from flask import current_app +from spotipy import SpotifyException + +from listenbrainz.domain.external_service import ExternalServiceError, ExternalServiceAPIError, \ + ExternalServiceInvalidGrantError +from listenbrainz.domain.spotify import SpotifyService + +from listenbrainz.listens_importer.base import ListensImporter +from listenbrainz.webserver import create_app +from listenbrainz.webserver.views.api_tools import LISTEN_TYPE_IMPORT, LISTEN_TYPE_PLAYING_NOW + + +class SpotifyImporter(ListensImporter): + + def __init__(self): + super(SpotifyImporter, self).__init__( + name='spotify_reader', + user_friendly_name="Spotify", + service=SpotifyService(), + ) + + @staticmethod + def _convert_spotify_play_to_listen(play, listen_type): + """ Converts data retrieved from the Spotify API into a listen. + + Args: + play (dict): a dict that represents a listen retrieved from Spotify + , this should be an "item" from the spotify response. + listen_type: the type of the listen (import or playing_now) + + Returns: + listen (dict): dict that can be submitted to ListenBrainz + """ + if listen_type == LISTEN_TYPE_PLAYING_NOW: + track = play + listen = {} + else: + track = play['track'] + listen = { + 'listened_at': parser.parse(play['played_at']).timestamp(), + } + + if track is None: + return None + + artists = track.get('artists', []) + artist_names = [] + spotify_artist_ids = [] + for a in artists: + name = a.get('name') + if name is not None: + artist_names.append(name) + spotify_id = a.get('external_urls', {}).get('spotify') + if spotify_id is not None: + spotify_artist_ids.append(spotify_id) + artist_name = ', '.join(artist_names) + + album = track.get('album', {}) + album_artists = album.get('artists', []) + release_artist_names = [] + spotify_album_artist_ids = [] + for a in album_artists: + name = a.get('name') + if name is not None: + release_artist_names.append(name) + spotify_id = a.get('external_urls', {}).get('spotify') + if spotify_id is not None: + spotify_album_artist_ids.append(spotify_id) + album_artist_name = ', '.join(release_artist_names) + + additional = { + 'tracknumber': track.get('track_number'), + 'spotify_artist_ids': spotify_artist_ids, + 'artist_names': artist_names, + 'discnumber': track.get('disc_number'), + 'duration_ms': track.get('duration_ms'), + 'spotify_album_id': album.get('external_urls', {}).get('spotify'), + # Named 'release_*' because 'release_name' is an official name in the docs + 'release_artist_name': album_artist_name, + 'release_artist_names': release_artist_names, + # Named 'album_*' because Spotify calls it album and this is spotify-specific + 'spotify_album_artist_ids': spotify_album_artist_ids, + 'submission_client': 'listenbrainz', + 'music_service': 'spotify.com' + } + isrc = track.get('external_ids', {}).get('isrc') + spotify_url = track.get('external_urls', {}).get('spotify') + if isrc: + additional['isrc'] = isrc + if spotify_url: + additional['spotify_id'] = spotify_url + additional['origin_url'] = spotify_url + + listen['track_metadata'] = { + 'artist_name': artist_name, + 'track_name': track['name'], + 'release_name': album['name'], + 'additional_info': additional, + } + return listen, listen_type + + def convert_spotify_current_play_to_listen(self, play): + return self._convert_spotify_play_to_listen(play, LISTEN_TYPE_PLAYING_NOW) + + def convert_spotify_recent_play_to_listen(self, play): + return self._convert_spotify_play_to_listen(play, LISTEN_TYPE_IMPORT) + + def make_api_request(self, user: dict, endpoint: str, **kwargs): + """ Make an request to the Spotify API for particular user at specified endpoint with args. + + Args: + user: the user whose plays are to be imported. + endpoint: the name of Spotipy function which makes request to the required API endpoint + + Returns: + the response from the spotify API + + Raises: + ExternalServiceAPIError: if we encounter errors from the Spotify API. + ExternalServiceError: if we encounter a rate limit, even after retrying. + """ + retries = 10 + delay = 1 + tried_to_refresh_token = False + + while retries > 0: + try: + spotipy_client = spotipy.Spotify(auth=user['access_token']) + spotipy_call = getattr(spotipy_client, endpoint) + recently_played = spotipy_call(**kwargs) + return recently_played + except (AttributeError, TypeError): + current_app.logger.critical("Invalid spotipy endpoint or arguments:", exc_info=True) + return None + except SpotifyException as e: + retries -= 1 + if e.http_status == 429: + # Rate Limit Problems -- the client handles these, but it can still give up + # after a certain number of retries, so we look at the header and try the + # request again, if the error is raised + try: + time_to_sleep = int(e.headers.get('Retry-After', delay)) + except ValueError: + time_to_sleep = delay + current_app.logger.warning('Encountered a rate limit, sleeping %d seconds and trying again...', + time_to_sleep) + time.sleep(time_to_sleep) + delay += 1 + if retries == 0: + raise ExternalServiceError('Encountered a rate limit.') + + elif e.http_status in (400, 403): + current_app.logger.critical('Error from the Spotify API for user %s: %s', user['musicbrainz_id'], + str(e), exc_info=True) + raise ExternalServiceAPIError('Error from the Spotify API while getting listens: %s', str(e)) + elif e.http_status >= 500 and e.http_status < 600: + # these errors are not our fault, most probably. so just log them and retry. + current_app.logger.error('Error while trying to get listens for user %s: %s', + user['musicbrainz_id'], str(e), exc_info=True) + if retries == 0: + raise ExternalServiceAPIError('Error from the spotify API while getting listens: %s', str(e)) + + elif e.http_status == 401: + # if we get 401 Unauthorized from Spotify, that means our token might have expired. + # In that case, try to refresh the token, if there is an error even while refreshing + # give up and report to the user. + # We only try to refresh the token once, if we still get 401 after that, we give up. + if not tried_to_refresh_token: + user = SpotifyService().refresh_access_token(user['user_id'], user['refresh_token']) + tried_to_refresh_token = True + + else: + raise ExternalServiceAPIError( + 'Could not authenticate with Spotify, please unlink and link your account again.') + elif e.http_status == 404: + current_app.logger.error("404 while trying to get listens for user %s", str(user), exc_info=True) + if retries == 0: + raise ExternalServiceError("404 while trying to get listens for user %s" % str(user)) + except Exception as e: + retries -= 1 + current_app.logger.error('Unexpected error while getting listens: %s', str(e), exc_info=True) + if retries == 0: + raise ExternalServiceError('Unexpected error while getting listens: %s' % str(e)) + + def get_user_recently_played(self, user): + """ Get tracks from the current user’s recently played tracks. """ + latest_listened_at_ts = 0 + if user['latest_listened_at']: + latest_listened_at_ts = int(user['latest_listened_at'].timestamp() * 1000) # latest listen UNIX ts in ms + + return self.make_api_request(user, 'current_user_recently_played', limit=50, after=latest_listened_at_ts) + + def get_user_currently_playing(self, user): + """ Get the user's currently playing track. + """ + return self.make_api_request(user, 'current_user_playing_track') + + def process_one_user(self, user: dict) -> int: + """ Get recently played songs for this user and submit them to ListenBrainz. + + Args: + user (spotify.Spotify): the user whose plays are to be imported. + + Raises: + spotify.SpotifyAPIError: if we encounter errors from the Spotify API. + spotify.SpotifyListenBrainzError: if we encounter a rate limit, even after retrying. + or if we get errors while submitting the data to ListenBrainz + Returns: + the number of recently played listens imported for the user + """ + try: + if self.service.user_oauth_token_has_expired(user): + user = self.service.refresh_access_token(user['user_id'], user['refresh_token']) + + listens = [] + latest_listened_at = None + + # If there is no playback, currently_playing will be None. + # There are two playing types, track and episode. We use only the + # track type. Therefore, when the user's playback type is not a track, + # Spotify will set the item field to null which becomes None after + # parsing the JSON. Due to these reasons, we cannot simplify the + # checks below. + currently_playing = self.get_user_currently_playing(user) + if currently_playing is not None: + currently_playing_item = currently_playing.get('item', None) + if currently_playing_item is not None: + current_app.logger.debug('Received a currently playing track for %s', str(user)) + now_playing_listen, _, _ = self.parse_and_validate_listen_items( + self.convert_spotify_current_play_to_listen, + [currently_playing_item] + ) + if now_playing_listen: + self.submit_listens_to_listenbrainz(user, [now_playing_listen], listen_type=LISTEN_TYPE_PLAYING_NOW) + + recently_played = self.get_user_recently_played(user) + if recently_played is not None and 'items' in recently_played: + _, listens, latest_listened_at = self.parse_and_validate_listen_items( + self.convert_spotify_recent_play_to_listen, + recently_played['items'] + ) + current_app.logger.debug('Received %d tracks for %s', len(listens), str(user)) + + # if we don't have any new listens, return. we don't check whether the listens list is empty here + # because it will empty in both cases where we don't receive any listens and when we receive only + # bad listens. so instead we check latest_listened_at which is None only in case when we received + # nothing from spotify. + if latest_listened_at is None: + self.service.update_user_import_status(user['user_id']) + return 0 + + self.submit_listens_to_listenbrainz(user, listens, listen_type=LISTEN_TYPE_IMPORT) + + # we've succeeded so update the last_updated and latest_listened_at field for this user + self.service.update_latest_listen_ts(user['user_id'], latest_listened_at) + + current_app.logger.info('imported %d listens for %s' % (len(listens), str(user['musicbrainz_id']))) + return len(listens) + + except ExternalServiceInvalidGrantError: + error_message = "It seems like you've revoked permission for us to read your spotify account" + self.service.update_user_import_status(user_id=user['user_id'], error=error_message) + if not current_app.config['TESTING']: + self.notify_error(user['musicbrainz_id'], error_message) + # user has revoked authorization through spotify ui or deleted their spotify account etc. + # + # we used to remove spotify access tokens from our database whenever we detected token revocation + # at one point. but one day spotify had a downtime while resulted in false revocation errors, and + # we ended up deleting most of our users' spotify access tokens. now we don't remove the token from + # database. this is actually more resilient and without downsides. if a user actually revoked their + # token, then its useless anyway so doesn't matter if we remove it. and if it is a false revocation + # error, we are saved! :) in any case, we do set an error message for the user in the database + # so that we can skip in future runs and notify them to reconnect if they want. + raise ExternalServiceError("User has revoked spotify authorization") + + except ExternalServiceAPIError as e: + # if it is an error from the Spotify API, show the error message to the user + self.service.update_user_import_status(user_id=user['user_id'], error=str(e)) + if not current_app.config['TESTING']: + self.notify_error(user['musicbrainz_id'], str(e)) + raise ExternalServiceError("Could not refresh user token from spotify") + + +if __name__ == '__main__': + app = create_app() + with app.app_context(): + importer = SpotifyImporter() + importer.main() diff --git a/listenbrainz/spotify_updater/tests/__init__.py b/listenbrainz/listens_importer/tests/__init__.py similarity index 100% rename from listenbrainz/spotify_updater/tests/__init__.py rename to listenbrainz/listens_importer/tests/__init__.py diff --git a/listenbrainz/spotify_updater/tests/data/spotify_play_no_isrc.json b/listenbrainz/listens_importer/tests/data/spotify_play_no_isrc.json similarity index 100% rename from listenbrainz/spotify_updater/tests/data/spotify_play_no_isrc.json rename to listenbrainz/listens_importer/tests/data/spotify_play_no_isrc.json diff --git a/listenbrainz/spotify_updater/tests/data/spotify_play_two_artists.json b/listenbrainz/listens_importer/tests/data/spotify_play_two_artists.json similarity index 100% rename from listenbrainz/spotify_updater/tests/data/spotify_play_two_artists.json rename to listenbrainz/listens_importer/tests/data/spotify_play_two_artists.json diff --git a/listenbrainz/spotify_updater/tests/test_spotify_read_listens.py b/listenbrainz/listens_importer/tests/test_spotify_read_listens.py similarity index 99% rename from listenbrainz/spotify_updater/tests/test_spotify_read_listens.py rename to listenbrainz/listens_importer/tests/test_spotify_read_listens.py index 3287d6f7bb..60a2bee63e 100644 --- a/listenbrainz/spotify_updater/tests/test_spotify_read_listens.py +++ b/listenbrainz/listens_importer/tests/test_spotify_read_listens.py @@ -10,7 +10,7 @@ from listenbrainz.domain.external_service import ExternalServiceAPIError, \ ExternalServiceInvalidGrantError from listenbrainz.domain.spotify import SpotifyService -from listenbrainz.spotify_updater import spotify_read_listens +from listenbrainz.listens_importer import spotify_read_listens from listenbrainz.webserver.views.api_tools import LISTEN_TYPE_IMPORT from unittest.mock import patch from listenbrainz.db.testing import DatabaseTestCase diff --git a/listenbrainz/spotify_updater/spotify_read_listens.py b/listenbrainz/spotify_updater/spotify_read_listens.py deleted file mode 100644 index 6d5485f189..0000000000 --- a/listenbrainz/spotify_updater/spotify_read_listens.py +++ /dev/null @@ -1,430 +0,0 @@ -#!/usr/bin/python3 -import time -from typing import Dict, List - -import spotipy -from brainzutils import metrics -from brainzutils.mail import send_mail -from dateutil import parser -from flask import current_app, render_template -from spotipy import SpotifyException -from sqlalchemy.exc import DatabaseError, SQLAlchemyError -from werkzeug.exceptions import InternalServerError, ServiceUnavailable - -import listenbrainz.webserver -from listenbrainz.db import user as db_user -from listenbrainz.db.exceptions import DatabaseException -from listenbrainz.domain.external_service import ExternalServiceError, ExternalServiceAPIError, \ - ExternalServiceInvalidGrantError -from listenbrainz.domain.spotify import SpotifyService -from listenbrainz.webserver.errors import ListenValidationError -from listenbrainz.webserver.models import SubmitListenUserMetadata -from listenbrainz.webserver.views.api_tools import insert_payload, validate_listen, LISTEN_TYPE_IMPORT, \ - LISTEN_TYPE_PLAYING_NOW - -METRIC_UPDATE_INTERVAL = 60 # seconds -_listens_imported_since_last_update = 0 # number of listens imported since last metric update was submitted -_metric_submission_time = time.monotonic() + METRIC_UPDATE_INTERVAL - - -def notify_error(musicbrainz_id: str, error: str): - """ Notifies specified user via email about error during Spotify import. - - Args: - musicbrainz_id: the MusicBrainz ID of the user - error: a description of the error encountered. - """ - user_email = db_user.get_by_mb_id(listenbrainz.webserver.db_conn, musicbrainz_id, fetch_email=True)["email"] - if not user_email: - return - - spotify_url = current_app.config['SERVER_ROOT_URL'] + '/settings/music-services/details/' - text = render_template('emails/spotify_import_error.txt', error=error, link=spotify_url) - send_mail( - subject='ListenBrainz Spotify Importer Error', - text=text, - recipients=[user_email], - from_name='ListenBrainz', - from_addr='noreply@'+current_app.config['MAIL_FROM_DOMAIN'], - ) - - -def _convert_spotify_play_to_listen(play, listen_type): - """ Converts data retrieved from the Spotify API into a listen. - - Args: - play (dict): a dict that represents a listen retrieved from Spotify - , this should be an "item" from the spotify response. - listen_type: the type of the listen (import or playing_now) - - Returns: - listen (dict): dict that can be submitted to ListenBrainz - """ - if listen_type == LISTEN_TYPE_PLAYING_NOW: - track = play - listen = {} - else: - track = play['track'] - listen = { - 'listened_at': parser.parse(play['played_at']).timestamp(), - } - - if track is None: - return None - - artists = track.get('artists', []) - artist_names = [] - spotify_artist_ids = [] - for a in artists: - name = a.get('name') - if name is not None: - artist_names.append(name) - spotify_id = a.get('external_urls', {}).get('spotify') - if spotify_id is not None: - spotify_artist_ids.append(spotify_id) - artist_name = ', '.join(artist_names) - - album = track.get('album', {}) - album_artists = album.get('artists', []) - release_artist_names = [] - spotify_album_artist_ids = [] - for a in album_artists: - name = a.get('name') - if name is not None: - release_artist_names.append(name) - spotify_id = a.get('external_urls', {}).get('spotify') - if spotify_id is not None: - spotify_album_artist_ids.append(spotify_id) - album_artist_name = ', '.join(release_artist_names) - - additional = { - 'tracknumber': track.get('track_number'), - 'spotify_artist_ids': spotify_artist_ids, - 'artist_names': artist_names, - 'discnumber': track.get('disc_number'), - 'duration_ms': track.get('duration_ms'), - 'spotify_album_id': album.get('external_urls', {}).get('spotify'), - # Named 'release_*' because 'release_name' is an official name in the docs - 'release_artist_name': album_artist_name, - 'release_artist_names': release_artist_names, - # Named 'album_*' because Spotify calls it album and this is spotify-specific - 'spotify_album_artist_ids': spotify_album_artist_ids, - 'submission_client': 'listenbrainz', - 'music_service': 'spotify.com' - } - isrc = track.get('external_ids', {}).get('isrc') - spotify_url = track.get('external_urls', {}).get('spotify') - if isrc: - additional['isrc'] = isrc - if spotify_url: - additional['spotify_id'] = spotify_url - additional['origin_url'] = spotify_url - - listen['track_metadata'] = { - 'artist_name': artist_name, - 'track_name': track['name'], - 'release_name': album['name'], - 'additional_info': additional, - } - return listen - - -def make_api_request(user: dict, endpoint: str, **kwargs): - """ Make an request to the Spotify API for particular user at specified endpoint with args. - - Args: - user: the user whose plays are to be imported. - endpoint: the name of Spotipy function which makes request to the required API endpoint - - Returns: - the response from the spotify API - - Raises: - ExternalServiceAPIError: if we encounter errors from the Spotify API. - ExternalServiceError: if we encounter a rate limit, even after retrying. - """ - retries = 10 - delay = 1 - tried_to_refresh_token = False - - while retries > 0: - try: - spotipy_client = spotipy.Spotify(auth=user['access_token']) - spotipy_call = getattr(spotipy_client, endpoint) - recently_played = spotipy_call(**kwargs) - break - except (AttributeError, TypeError): - current_app.logger.critical("Invalid spotipy endpoint or arguments:", exc_info=True) - return None - except SpotifyException as e: - retries -= 1 - if e.http_status == 429: - # Rate Limit Problems -- the client handles these, but it can still give up - # after a certain number of retries, so we look at the header and try the - # request again, if the error is raised - try: - time_to_sleep = int(e.headers.get('Retry-After', delay)) - except ValueError: - time_to_sleep = delay - current_app.logger.warn('Encountered a rate limit, sleeping %d seconds and trying again...', time_to_sleep) - time.sleep(time_to_sleep) - delay += 1 - if retries == 0: - raise ExternalServiceError('Encountered a rate limit.') - - elif e.http_status in (400, 403): - current_app.logger.critical('Error from the Spotify API for user %s: %s', user['musicbrainz_id'], str(e), exc_info=True) - raise ExternalServiceAPIError('Error from the Spotify API while getting listens: %s', str(e)) - - elif e.http_status >= 500 and e.http_status < 600: - # these errors are not our fault, most probably. so just log them and retry. - current_app.logger.error('Error while trying to get listens for user %s: %s', user['musicbrainz_id'], str(e), exc_info=True) - if retries == 0: - raise ExternalServiceAPIError('Error from the spotify API while getting listens: %s', str(e)) - - elif e.http_status == 401: - # if we get 401 Unauthorized from Spotify, that means our token might have expired. - # In that case, try to refresh the token, if there is an error even while refreshing - # give up and report to the user. - # We only try to refresh the token once, if we still get 401 after that, we give up. - if not tried_to_refresh_token: - user = SpotifyService().refresh_access_token(user['user_id'], user['refresh_token']) - tried_to_refresh_token = True - - else: - raise ExternalServiceAPIError('Could not authenticate with Spotify, please unlink and link your account again.') - elif e.http_status == 404: - current_app.logger.error("404 while trying to get listens for user %s", str(user), exc_info=True) - if retries == 0: - raise ExternalServiceError("404 while trying to get listens for user %s" % str(user)) - except Exception as e: - retries -= 1 - current_app.logger.error('Unexpected error while getting listens: %s', str(e), exc_info=True) - if retries == 0: - raise ExternalServiceError('Unexpected error while getting listens: %s' % str(e)) - - return recently_played - - -def get_user_recently_played(user): - """ Get tracks from the current user’s recently played tracks. - """ - latest_listened_at_ts = 0 - if user['latest_listened_at']: - latest_listened_at_ts = int(user['latest_listened_at'].timestamp() * 1000) # latest listen UNIX ts in ms - - return make_api_request(user, 'current_user_recently_played', limit=50, after=latest_listened_at_ts) - - -def get_user_currently_playing(user): - """ Get the user's currently playing track. - """ - return make_api_request(user, 'current_user_playing_track') - - -def submit_listens_to_listenbrainz(user: Dict, listens: List, listen_type=LISTEN_TYPE_IMPORT): - """ Submit a batch of listens to ListenBrainz - - Args: - user: the user whose listens are to be submitted, dict should contain - at least musicbrainz_id and user_id - listens: a list of listens to be submitted - listen_type: the type of listen (single, import, playing_now) - """ - username = user['musicbrainz_id'] - user_metadata = SubmitListenUserMetadata(user_id=user['user_id'], musicbrainz_id=username) - retries = 10 - while retries >= 0: - try: - current_app.logger.debug('Submitting %d listens for user %s', len(listens), username) - insert_payload(listens, user_metadata, listen_type=listen_type) - current_app.logger.debug('Submitted!') - break - except (InternalServerError, ServiceUnavailable) as e: - retries -= 1 - current_app.logger.error('ISE while trying to import listens for %s: %s', username, str(e)) - if retries == 0: - raise ExternalServiceError('ISE while trying to import listens: %s', str(e)) - - -def parse_and_validate_spotify_plays(plays, listen_type): - """ Converts and validates the listens received from the Spotify API. - - Args: - plays: a list of items received from Spotify - listen_type: the type of the plays (import or playing now) - - Returns: - tuple of (a list of valid listens to submit to ListenBrainz, timestamp of latest listen) - """ - listens = [] - latest_listen_ts = None - - for play in plays: - listen = _convert_spotify_play_to_listen(play, listen_type=listen_type) - - if listen_type == LISTEN_TYPE_IMPORT and \ - (latest_listen_ts is None or listen['listened_at'] > latest_listen_ts): - latest_listen_ts = listen['listened_at'] - - try: - listens.append(validate_listen(listen, listen_type)) - except ListenValidationError: - pass - return listens, latest_listen_ts - - -def process_one_user(user: dict, service: SpotifyService) -> int: - """ Get recently played songs for this user and submit them to ListenBrainz. - - Args: - user (spotify.Spotify): the user whose plays are to be imported. - service (listenbrainz.domain.spotify.SpotifyService): service to process users - - Raises: - spotify.SpotifyAPIError: if we encounter errors from the Spotify API. - spotify.SpotifyListenBrainzError: if we encounter a rate limit, even after retrying. - or if we get errors while submitting the data to ListenBrainz - Returns: - the number of recently played listens imported for the user - """ - try: - if service.user_oauth_token_has_expired(user): - user = service.refresh_access_token(user['user_id'], user['refresh_token']) - - listens = [] - latest_listened_at = None - - # If there is no playback, currently_playing will be None. - # There are two playing types, track and episode. We use only the - # track type. Therefore, when the user's playback type is not a track, - # Spotify will set the item field to null which becomes None after - # parsing the JSON. Due to these reasons, we cannot simplify the - # checks below. - currently_playing = get_user_currently_playing(user) - if currently_playing is not None: - currently_playing_item = currently_playing.get('item', None) - if currently_playing_item is not None: - current_app.logger.debug('Received a currently playing track for %s', str(user)) - listens, latest_listened_at = parse_and_validate_spotify_plays( - [currently_playing_item], - LISTEN_TYPE_PLAYING_NOW - ) - if listens: - submit_listens_to_listenbrainz(user, listens, listen_type=LISTEN_TYPE_PLAYING_NOW) - - recently_played = get_user_recently_played(user) - if recently_played is not None and 'items' in recently_played: - listens, latest_listened_at = parse_and_validate_spotify_plays(recently_played['items'], LISTEN_TYPE_IMPORT) - current_app.logger.debug('Received %d tracks for %s', len(listens), str(user)) - - # if we don't have any new listens, return. we don't check whether the listens list is empty here - # because it will empty in both cases where we don't receive any listens and when we receive only - # bad listens. so instead we check latest_listened_at which is None only in case when we received - # nothing from spotify. - if latest_listened_at is None: - service.update_user_import_status(user['user_id']) - return 0 - - submit_listens_to_listenbrainz(user, listens, listen_type=LISTEN_TYPE_IMPORT) - - # we've succeeded so update the last_updated and latest_listened_at field for this user - service.update_latest_listen_ts(user['user_id'], latest_listened_at) - - current_app.logger.info('imported %d listens for %s' % (len(listens), str(user['musicbrainz_id']))) - return len(listens) - - except ExternalServiceInvalidGrantError: - error_message = "It seems like you've revoked permission for us to read your spotify account" - service.update_user_import_status(user_id=user['user_id'], error=error_message) - if not current_app.config['TESTING']: - notify_error(user['musicbrainz_id'], error_message) - # user has revoked authorization through spotify ui or deleted their spotify account etc. - # - # we used to remove spotify access tokens from our database whenever we detected token revocation - # at one point. but one day spotify had a downtime while resulted in false revocation errors, and - # we ended up deleting most of our users' spotify access tokens. now we don't remove the token from - # database. this is actually more resilient and without downsides. if a user actually revoked their - # token, then its useless anyway so doesn't matter if we remove it. and if it is a false revocation - # error, we are saved! :) in any case, we do set an error message for the user in the database - # so that we can skip in future runs and notify them to reconnect if they want. - raise ExternalServiceError("User has revoked spotify authorization") - - except ExternalServiceAPIError as e: - # if it is an error from the Spotify API, show the error message to the user - service.update_user_import_status(user_id=user['user_id'], error=str(e)) - if not current_app.config['TESTING']: - notify_error(user['musicbrainz_id'], str(e)) - raise ExternalServiceError("Could not refresh user token from spotify") - - -def process_all_spotify_users(): - """ Get a batch of users to be processed and import their Spotify plays. - - Returns: - (success, failure) where - success: the number of users whose plays were successfully imported. - failure: the number of users for whom we faced errors while importing. - """ - - global _listens_imported_since_last_update, _metric_submission_time - - service = SpotifyService() - try: - users = service.get_active_users_to_process() - except DatabaseException as e: - listenbrainz.webserver.db_conn.rollback() - current_app.logger.error('Cannot get list of users due to error %s', str(e), exc_info=True) - return 0, 0 - - if not users: - return 0, 0 - - current_app.logger.info('Process %d users...' % len(users)) - success = 0 - failure = 0 - for u in users: - try: - _listens_imported_since_last_update += process_one_user(u, service) - success += 1 - except (DatabaseException, DatabaseError, SQLAlchemyError): - listenbrainz.webserver.db_conn.rollback() - current_app.logger.error('spotify_reader could not import listens for user %s:', - u['musicbrainz_id'], exc_info=True) - except Exception: - current_app.logger.error('spotify_reader could not import listens for user %s:', - u['musicbrainz_id'], exc_info=True) - failure += 1 - - if time.monotonic() > _metric_submission_time: - _metric_submission_time += METRIC_UPDATE_INTERVAL - metrics.set("spotify_reader", imported_listens=_listens_imported_since_last_update) - _listens_imported_since_last_update = 0 - - current_app.logger.info('Processed %d users successfully!', success) - current_app.logger.info('Encountered errors while processing %d users.', failure) - return success, failure - - -def main(): - app = listenbrainz.webserver.create_app() - with app.app_context(): - current_app.logger.info('Spotify Reader started...') - while True: - t = time.monotonic() - success, failure = process_all_spotify_users() - total_users = success + failure - if total_users > 0: - total_time = time.monotonic() - t - avg_time = total_time / total_users - metrics.set("spotify_reader", - users_processed=total_users, - time_to_process_all_users=total_time, - time_to_process_one_user=avg_time) - current_app.logger.info('All %d users in batch have been processed.', total_users) - current_app.logger.info('Total time taken: %.2f s, average time per user: %.2f s.', total_time, avg_time) - time.sleep(10) - - -if __name__ == '__main__': - main() diff --git a/listenbrainz/tests/integration/test_spotify_read_listens.py b/listenbrainz/tests/integration/test_spotify_read_listens.py index c00a0d2061..dbe3109dd2 100644 --- a/listenbrainz/tests/integration/test_spotify_read_listens.py +++ b/listenbrainz/tests/integration/test_spotify_read_listens.py @@ -8,7 +8,7 @@ from data.model.external_service import ExternalServiceType from listenbrainz.listenstore.timescale_utils import recalculate_all_user_data -from listenbrainz.spotify_updater import spotify_read_listens +from listenbrainz.listens_importer import spotify_read_listens from listenbrainz.tests.integration import ListenAPIIntegrationTestCase from listenbrainz.db import external_service_oauth diff --git a/listenbrainz/webserver/templates/emails/listens_importer_error.txt b/listenbrainz/webserver/templates/emails/listens_importer_error.txt new file mode 100644 index 0000000000..c9d628f319 --- /dev/null +++ b/listenbrainz/webserver/templates/emails/listens_importer_error.txt @@ -0,0 +1,10 @@ +Hi! + +We encountered an error while importing your listens from {{service}}. +The error was as follows: "{{error}}" + +Please take a look at your {{service}} import page ({{link}}) +for more information. + +Best, +The ListenBrainz Team diff --git a/listenbrainz/webserver/templates/emails/spotify_import_error.txt b/listenbrainz/webserver/templates/emails/spotify_import_error.txt deleted file mode 100644 index 0576f4a11b..0000000000 --- a/listenbrainz/webserver/templates/emails/spotify_import_error.txt +++ /dev/null @@ -1,10 +0,0 @@ -Hi! - -We encountered an error while importing your listens from Spotify. -The error was as follows: "{{error}}" - -Please take a look at your Spotify import page ({{link}}) -for more information. - -Best, -The ListenBrainz Team diff --git a/listenbrainz/webserver/views/settings.py b/listenbrainz/webserver/views/settings.py index 45c767eaa1..fd1b1162b8 100644 --- a/listenbrainz/webserver/views/settings.py +++ b/listenbrainz/webserver/views/settings.py @@ -1,36 +1,30 @@ import json -import os.path from datetime import datetime -import orjson -import json -from flask import Blueprint, Response, render_template, request, url_for, \ - redirect, current_app, jsonify, stream_with_context, send_file +from flask import Blueprint, render_template, request, url_for, \ + redirect, current_app, jsonify from flask_login import current_user, login_required -from sqlalchemy import text from werkzeug.exceptions import NotFound, BadRequest -import listenbrainz.db.feedback as db_feedback import listenbrainz.db.user as db_user import listenbrainz.db.user_setting as db_usersetting from data.model.external_service import ExternalServiceType from listenbrainz.background.background_tasks import add_task -from listenbrainz.db import listens_importer -from listenbrainz.db.missing_musicbrainz_data import get_user_missing_musicbrainz_data from listenbrainz.db.exceptions import DatabaseException +from listenbrainz.db.missing_musicbrainz_data import get_user_missing_musicbrainz_data from listenbrainz.domain.apple import AppleService from listenbrainz.domain.critiquebrainz import CritiqueBrainzService, CRITIQUEBRAINZ_SCOPES from listenbrainz.domain.external_service import ExternalService, ExternalServiceInvalidGrantError +from listenbrainz.domain.lastfm import LastfmService from listenbrainz.domain.musicbrainz import MusicBrainzService from listenbrainz.domain.soundcloud import SoundCloudService from listenbrainz.domain.spotify import SpotifyService, SPOTIFY_LISTEN_PERMISSIONS, SPOTIFY_IMPORT_PERMISSIONS from listenbrainz.webserver import db_conn, ts_conn -from listenbrainz.webserver import timescale_connection from listenbrainz.webserver.decorators import web_listenstore_needed -from listenbrainz.webserver.errors import APIServiceUnavailable, APINotFound, APIForbidden, APIInternalServerError +from listenbrainz.webserver.errors import APIServiceUnavailable, APINotFound, APIForbidden, APIInternalServerError, \ + APIBadRequest from listenbrainz.webserver.login import api_login_required - settings_bp = Blueprint("settings", __name__) @@ -67,16 +61,6 @@ def set_troi_prefs(): return jsonify(data) -@settings_bp.route("/resetlatestimportts/", methods=["POST"]) -@api_login_required -def reset_latest_import_timestamp(): - try: - listens_importer.update_latest_listened_at(db_conn, current_user.id, ExternalServiceType.LASTFM, 0) - return jsonify({"success": True}) - except DatabaseException: - raise APIInternalServerError("Something went wrong! Unable to reset latest import timestamp right now.") - - @settings_bp.route("/import/", methods=["POST"]) @api_login_required def import_data(): @@ -161,12 +145,14 @@ def _get_service_or_raise_404(name: str, include_mb=False, exclude_apple=False) return CritiqueBrainzService() elif service == ExternalServiceType.SOUNDCLOUD: return SoundCloudService() + elif service == ExternalServiceType.LASTFM: + return LastfmService() elif not exclude_apple and service == ExternalServiceType.APPLE: return AppleService() elif include_mb and service == ExternalServiceType.MUSICBRAINZ: return MusicBrainzService() except KeyError: - raise NotFound("Service %s is invalid." % name) + raise NotFound("Service %s is invalid." % (name,)) @settings_bp.route('/music-services/details/', methods=['POST']) @@ -198,11 +184,16 @@ def music_services_details(): apple_user = apple_service.get_user(current_user.id) current_apple_permissions = "listen" if apple_user and apple_user["refresh_token"] else "disable" + lastfm_service = LastfmService() + lastfm_user = lastfm_service.get_user(current_user.id) + current_lastfm_permissions = "import" if lastfm_user else "disable" + data = { "current_spotify_permissions": current_spotify_permissions, "current_critiquebrainz_permissions": current_critiquebrainz_permissions, "current_soundcloud_permissions": current_soundcloud_permissions, "current_apple_permissions": current_apple_permissions, + "current_lastfm_permissions": current_lastfm_permissions, } return jsonify(data) @@ -242,6 +233,34 @@ def refresh_service_token(service_name: str): return jsonify({"access_token": user["access_token"]}) +@settings_bp.route('/music-services//connect/', methods=['POST']) +@api_login_required +def music_services_connect(service_name: str): + """ Connect last.fm/libre.fm account to ListenBrainz user. """ + # TODO: add support for libre.fm + if service_name.lower() != "lastfm": + raise APINotFound("Service %s is invalid." % (service_name,)) + + data = request.json + if "external_user_id" not in data: + raise APIBadRequest("Missing 'external_user_id' in request.") + if "latest_listened_at" not in data: + raise APIBadRequest("Missing 'latest_listened_at' in request.") + + try: + latest_listened_at = datetime.fromisoformat(data["latest_listened_at"]) + except (ValueError, TypeError): + raise APIBadRequest(f"Value of latest_listened_at '{data['latest_listened_at']} is invalid.") + + # TODO: make last.fm start import timestamp configurable + service = LastfmService() + service.add_new_user(current_user.id, { + "external_user_id": data["external_user_id"], + "latest_listened_at": latest_listened_at, + }) + return jsonify({"success": True}) + + @settings_bp.route('/music-services//disconnect/', methods=['POST']) @api_login_required def music_services_disconnect(service_name: str):