Skip to content

Commit

Permalink
在加载收藏时并行查询剧集进度以提高速度
Browse files Browse the repository at this point in the history
  • Loading branch information
Him188 committed Nov 7, 2024
1 parent 554e6e0 commit 5a86729
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import me.him188.ani.datasources.api.EpisodeType.MainStory
import me.him188.ani.datasources.api.topic.UnifiedCollectionType
import me.him188.ani.utils.platform.currentTimeMillis
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds

Expand All @@ -49,6 +50,7 @@ class EpisodeCollectionRepository(
private val bangumiEpisodeService: BangumiEpisodeService,
private val enableAllEpisodeTypes: Flow<Boolean>,
private val defaultDispatcher: CoroutineContext = Dispatchers.Default,
private val cacheExpiry: Duration = 1.hours,
) : Repository {
private val epTypeFilter get() = enableAllEpisodeTypes.map { if (it) null else MainStory }

Expand All @@ -72,14 +74,16 @@ class EpisodeCollectionRepository(
* 获取指定条目的所有剧集信息, 如果没有则从网络获取并缓存
*/
fun subjectEpisodeCollectionInfosFlow(
subjectId: Int
subjectId: Int,
allowCached: Boolean = true,
): Flow<List<EpisodeCollectionInfo>> = epTypeFilter.flatMapLatest { epType ->
if (subjectDao.findById(subjectId).first()?.totalEpisodes == 0) {
return@flatMapLatest flowOf(emptyList())
}
episodeCollectionDao.filterBySubjectId(subjectId, epType).mapLatest { episodes ->
if (episodes.isNotEmpty() &&
(currentTimeMillis() - (episodes.maxOfOrNull { it.lastUpdated } ?: 0)).milliseconds <= 1.hours
if (allowCached &&
episodes.isNotEmpty() &&
(currentTimeMillis() - (episodes.maxOfOrNull { it.lastUpdated } ?: 0)).milliseconds <= cacheExpiry
) {
// 有有效缓存则直接返回
return@mapLatest episodes.map { it.toEpisodeCollectionInfo() }
Expand Down Expand Up @@ -141,7 +145,7 @@ class EpisodeCollectionRepository(
) : RemoteMediator<Int, T>() {
override suspend fun initialize(): InitializeAction {
return withContext(defaultDispatcher) {
if ((currentTimeMillis() - episodeCollectionDao.lastUpdated()).milliseconds > 1.hours) {
if ((currentTimeMillis() - episodeCollectionDao.lastUpdated()).milliseconds > cacheExpiry) {
InitializeAction.LAUNCH_INITIAL_REFRESH
} else {
InitializeAction.SKIP_INITIAL_REFRESH
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import androidx.paging.RemoteMediator
import androidx.paging.map
import io.ktor.client.plugins.ResponseException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.retry
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.withContext
import me.him188.ani.app.data.models.episode.EpisodeCollectionInfo
import me.him188.ani.app.data.models.episode.isKnownCompleted
Expand Down Expand Up @@ -55,6 +59,7 @@ import me.him188.ani.utils.coroutines.IO_
import me.him188.ani.utils.logging.logger
import me.him188.ani.utils.platform.currentTimeMillis
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds

Expand Down Expand Up @@ -164,8 +169,11 @@ class SubjectCollectionRepositoryImpl(
)
}

private suspend fun getSubjectEpisodeCollections(subjectId: Int) =
episodeCollectionRepository.subjectEpisodeCollectionInfosFlow(subjectId).flowOn(defaultDispatcher).first()
private suspend fun getSubjectEpisodeCollections(
subjectId: Int,
allowCached: Boolean = true,
) = episodeCollectionRepository.subjectEpisodeCollectionInfosFlow(subjectId, allowCached).flowOn(defaultDispatcher)
.first()

override fun mostRecentlyUpdatedSubjectCollectionsFlow(
limit: Int,
Expand Down Expand Up @@ -193,7 +201,7 @@ class SubjectCollectionRepositoryImpl(
).flow.map { data ->
data.map { entity ->
entity.toSubjectCollectionInfo(
episodes = getSubjectEpisodeCollections(entity.subjectId),
episodes = getSubjectEpisodeCollections(entity.subjectId), // 通常会读取缓存, 因为 [SubjectCollectionRemoteMediator] 会提前查询这个
currentDate = getCurrentDate(),
)
}
Expand Down Expand Up @@ -269,14 +277,38 @@ class SubjectCollectionRepositoryImpl(
limit = state.config.pageSize,
)

for (collection in items) {
val subject = collection.batchSubjectDetails
subjectCollectionDao.upsert(
subject.toEntity(
collection.collection?.type.toCollectionType(),
collection.collection.toSelfRatingInfo(),
),
)
coroutineScope {
// 提前'批量'查询剧集收藏状态, 防止在收藏页显示结果时一个一个查导致太慢
val concurrency = Semaphore(4)
items.forEach { subjectCollection ->
launch {
try {
subjectCollection.collection?.subjectId?.let { subjectId ->
concurrency.withPermit {
getSubjectEpisodeCollections(
subjectId,
allowCached = false, // SubjectCollectionRemoteMediator 是强制刷新
) // side-effect: update database
}
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
// 这里我们只是批量提前查询, 不一定需要成功. 后面等收藏页真正需要时再来重试和处理对应异常
logger.error("Failed to fetch episode collections", e)
}
}
}

for (collection in items) {
val subject = collection.batchSubjectDetails
subjectCollectionDao.upsert(
subject.toEntity(
collection.collection?.type.toCollectionType(),
collection.collection.toSelfRatingInfo(),
),
)
}
}

MediatorResult.Success(endOfPaginationReached = items.isEmpty())
Expand Down

0 comments on commit 5a86729

Please sign in to comment.