From 5a8672952c08e7e40da570d16a6b497f9bc60784 Mon Sep 17 00:00:00 2001 From: Him188 Date: Thu, 7 Nov 2024 16:02:43 +0000 Subject: [PATCH] =?UTF-8?q?=E5=9C=A8=E5=8A=A0=E8=BD=BD=E6=94=B6=E8=97=8F?= =?UTF-8?q?=E6=97=B6=E5=B9=B6=E8=A1=8C=E6=9F=A5=E8=AF=A2=E5=89=A7=E9=9B=86?= =?UTF-8?q?=E8=BF=9B=E5=BA=A6=E4=BB=A5=E6=8F=90=E9=AB=98=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../episode/EpisodeCollectionRepository.kt | 12 +++-- .../subject/SubjectCollectionRepository.kt | 54 +++++++++++++++---- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/app/shared/app-data/src/commonMain/kotlin/data/repository/episode/EpisodeCollectionRepository.kt b/app/shared/app-data/src/commonMain/kotlin/data/repository/episode/EpisodeCollectionRepository.kt index 7495284729..ac9eea97bb 100644 --- a/app/shared/app-data/src/commonMain/kotlin/data/repository/episode/EpisodeCollectionRepository.kt +++ b/app/shared/app-data/src/commonMain/kotlin/data/repository/episode/EpisodeCollectionRepository.kt @@ -40,6 +40,7 @@ import me.him188.ani.datasources.api.EpisodeType.MainStory import me.him188.ani.datasources.api.topic.UnifiedCollectionType import me.him188.ani.utils.platform.currentTimeMillis import kotlin.coroutines.CoroutineContext +import kotlin.time.Duration import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.milliseconds @@ -49,6 +50,7 @@ class EpisodeCollectionRepository( private val bangumiEpisodeService: BangumiEpisodeService, private val enableAllEpisodeTypes: Flow, private val defaultDispatcher: CoroutineContext = Dispatchers.Default, + private val cacheExpiry: Duration = 1.hours, ) : Repository { private val epTypeFilter get() = enableAllEpisodeTypes.map { if (it) null else MainStory } @@ -72,14 +74,16 @@ class EpisodeCollectionRepository( * 获取指定条目的所有剧集信息, 如果没有则从网络获取并缓存 */ fun subjectEpisodeCollectionInfosFlow( - subjectId: Int + subjectId: Int, + allowCached: Boolean = true, ): Flow> = epTypeFilter.flatMapLatest { epType -> if (subjectDao.findById(subjectId).first()?.totalEpisodes == 0) { return@flatMapLatest flowOf(emptyList()) } episodeCollectionDao.filterBySubjectId(subjectId, epType).mapLatest { episodes -> - if (episodes.isNotEmpty() && - (currentTimeMillis() - (episodes.maxOfOrNull { it.lastUpdated } ?: 0)).milliseconds <= 1.hours + if (allowCached && + episodes.isNotEmpty() && + (currentTimeMillis() - (episodes.maxOfOrNull { it.lastUpdated } ?: 0)).milliseconds <= cacheExpiry ) { // 有有效缓存则直接返回 return@mapLatest episodes.map { it.toEpisodeCollectionInfo() } @@ -141,7 +145,7 @@ class EpisodeCollectionRepository( ) : RemoteMediator() { override suspend fun initialize(): InitializeAction { return withContext(defaultDispatcher) { - if ((currentTimeMillis() - episodeCollectionDao.lastUpdated()).milliseconds > 1.hours) { + if ((currentTimeMillis() - episodeCollectionDao.lastUpdated()).milliseconds > cacheExpiry) { InitializeAction.LAUNCH_INITIAL_REFRESH } else { InitializeAction.SKIP_INITIAL_REFRESH diff --git a/app/shared/app-data/src/commonMain/kotlin/data/repository/subject/SubjectCollectionRepository.kt b/app/shared/app-data/src/commonMain/kotlin/data/repository/subject/SubjectCollectionRepository.kt index dad7643ec4..24d22671ec 100644 --- a/app/shared/app-data/src/commonMain/kotlin/data/repository/subject/SubjectCollectionRepository.kt +++ b/app/shared/app-data/src/commonMain/kotlin/data/repository/subject/SubjectCollectionRepository.kt @@ -18,12 +18,16 @@ import androidx.paging.RemoteMediator import androidx.paging.map import io.ktor.client.plugins.ResponseException import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.retry +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withContext import me.him188.ani.app.data.models.episode.EpisodeCollectionInfo import me.him188.ani.app.data.models.episode.isKnownCompleted @@ -55,6 +59,7 @@ import me.him188.ani.utils.coroutines.IO_ import me.him188.ani.utils.logging.logger import me.him188.ani.utils.platform.currentTimeMillis import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.cancellation.CancellationException import kotlin.time.Duration.Companion.hours import kotlin.time.Duration.Companion.milliseconds @@ -164,8 +169,11 @@ class SubjectCollectionRepositoryImpl( ) } - private suspend fun getSubjectEpisodeCollections(subjectId: Int) = - episodeCollectionRepository.subjectEpisodeCollectionInfosFlow(subjectId).flowOn(defaultDispatcher).first() + private suspend fun getSubjectEpisodeCollections( + subjectId: Int, + allowCached: Boolean = true, + ) = episodeCollectionRepository.subjectEpisodeCollectionInfosFlow(subjectId, allowCached).flowOn(defaultDispatcher) + .first() override fun mostRecentlyUpdatedSubjectCollectionsFlow( limit: Int, @@ -193,7 +201,7 @@ class SubjectCollectionRepositoryImpl( ).flow.map { data -> data.map { entity -> entity.toSubjectCollectionInfo( - episodes = getSubjectEpisodeCollections(entity.subjectId), + episodes = getSubjectEpisodeCollections(entity.subjectId), // 通常会读取缓存, 因为 [SubjectCollectionRemoteMediator] 会提前查询这个 currentDate = getCurrentDate(), ) } @@ -269,14 +277,38 @@ class SubjectCollectionRepositoryImpl( limit = state.config.pageSize, ) - for (collection in items) { - val subject = collection.batchSubjectDetails - subjectCollectionDao.upsert( - subject.toEntity( - collection.collection?.type.toCollectionType(), - collection.collection.toSelfRatingInfo(), - ), - ) + coroutineScope { + // 提前'批量'查询剧集收藏状态, 防止在收藏页显示结果时一个一个查导致太慢 + val concurrency = Semaphore(4) + items.forEach { subjectCollection -> + launch { + try { + subjectCollection.collection?.subjectId?.let { subjectId -> + concurrency.withPermit { + getSubjectEpisodeCollections( + subjectId, + allowCached = false, // SubjectCollectionRemoteMediator 是强制刷新 + ) // side-effect: update database + } + } + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + // 这里我们只是批量提前查询, 不一定需要成功. 后面等收藏页真正需要时再来重试和处理对应异常 + logger.error("Failed to fetch episode collections", e) + } + } + } + + for (collection in items) { + val subject = collection.batchSubjectDetails + subjectCollectionDao.upsert( + subject.toEntity( + collection.collection?.type.toCollectionType(), + collection.collection.toSelfRatingInfo(), + ), + ) + } } MediatorResult.Success(endOfPaginationReached = items.isEmpty())