Bifurcation 0

Replace RxJava in HttpPageLoader downloader (#8955)

* Convert downloader Observable to flow

Uses `runInterruptible` to turn the blocking call to `queue.take()`
into a cancellable call.

Flow collection is ended by cancelling the scope in `recycle`. This
means the `HttpPageLoader` can't be reused after calling `recycle`,
but this was true with the `Observable` as well.)

* Convert load Observables to suspending function

Inlining the Observables allows for some simplification of the error
handling. Behavior should be otherwise identical.

* Convert cleanup Completable to coroutine

Uses global `launchIO`, not ideal but similar to previous behavior.
Can't be scheduled on the local `scope` as this runs after `scope` is
Cette révision appartient à :
Two-Ai 2023-01-21 16:46:16 -05:00 révisé par GitHub
Parent a179327d9d
révision e4bc8990fb
Aucune clé n'a été trouvée pour cette signature dans la base de données
ID de la clé GPG: 4AEE18F83AFDEB23

Voir le fichier

@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter
import eu.kanade.tachiyomi.ui.reader.model.ReaderPage
import eu.kanade.tachiyomi.util.lang.plusAssign
import eu.kanade.tachiyomi.util.system.logcat
import eu.kanade.tachiyomi.util.lang.awaitSingle
import eu.kanade.tachiyomi.util.lang.launchIO
import kotlinx.coroutines.CancellationException
import logcat.LogPriority
import rx.Completable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import rx.Observable
import rx.schedulers.Schedulers
import rx.subjects.PublishSubject
import rx.subjects.SerializedSubject
import rx.subscriptions.CompositeSubscription
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
import java.util.concurrent.PriorityBlockingQueue
@ -31,33 +35,27 @@ class HttpPageLoader(
private val chapterCache: ChapterCache = Injekt.get(),
) : PageLoader() {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
* A queue used to manage requests one by one while allowing priorities.
private val queue = PriorityBlockingQueue<PriorityPage>()
* Current active subscriptions.
private val subscriptions = CompositeSubscription()
private val preloadSize = 4
init {
subscriptions += Observable.defer { Observable.just(queue.take().page) }
.filter { it.status == Page.State.QUEUE }
.concatMap { source.fetchImageFromCacheThenNet(it) }
{ error ->
if (error !is InterruptedException) {
logcat(LogPriority.ERROR, error)
scope.launchIO {
flow {
while (true) {
emit(runInterruptible { queue.take() }.page)
.filter { it.status == Page.State.QUEUE }
.collect {
@ -65,21 +63,23 @@ class HttpPageLoader(
override fun recycle() {
// Cache current page list progress for online chapters to allow a faster reopen
val pages = chapter.pages
if (pages != null) {
.fromAction {
launchIO {
try {
// Convert to pages without reader information
val pagesToSave = { Page(it.index, it.url, it.imageUrl) }
chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave)
} catch (e: Throwable) {
if (e is CancellationException) {
throw e
@ -192,61 +192,32 @@ class HttpPageLoader(
* Returns an observable of the page with the downloaded image.
* Loads the page, retrieving the image URL and downloading the image if necessary.
* Downloaded images are stored in the chapter cache.
* @param page the page whose source image has to be downloaded.
private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable<ReaderPage> {
return if (page.imageUrl.isNullOrEmpty()) {
getImageUrl(page).flatMap { getCachedImage(it) }
} else {
private suspend fun loadPage(page: ReaderPage) {
try {
if (page.imageUrl.isNullOrEmpty()) {
page.status = Page.State.LOAD_PAGE
page.imageUrl = source.fetchImageUrl(page).awaitSingle()
val imageUrl = page.imageUrl!!
if (!chapterCache.isImageInCache(imageUrl)) {
page.status = Page.State.DOWNLOAD_IMAGE
val imageResponse = source.fetchImage(page).awaitSingle()
chapterCache.putImageToCache(imageUrl, imageResponse)
} = { chapterCache.getImageFile(imageUrl).inputStream() }
page.status = Page.State.READY
} catch (e: Throwable) {
page.status = Page.State.ERROR
if (e is CancellationException) {
throw e
private fun HttpSource.getImageUrl(page: ReaderPage): Observable<ReaderPage> {
page.status = Page.State.LOAD_PAGE
return fetchImageUrl(page)
.doOnError { page.status = Page.State.ERROR }
.onErrorReturn { null }
.doOnNext { page.imageUrl = it }
.map { page }
* Returns an observable of the page that gets the image from the chapter or fallbacks to
* network and copies it to the cache calling [cacheImage].
* @param page the page.
private fun HttpSource.getCachedImage(page: ReaderPage): Observable<ReaderPage> {
val imageUrl = page.imageUrl ?: return Observable.just(page)
return Observable.just(page)
.flatMap {
if (!chapterCache.isImageInCache(imageUrl)) {
} else {
.doOnNext { = { chapterCache.getImageFile(imageUrl).inputStream() }
page.status = Page.State.READY
.doOnError { page.status = Page.State.ERROR }
.onErrorReturn { page }
* Returns an observable of the page that downloads the image to [ChapterCache].
* @param page the page.
private fun HttpSource.cacheImage(page: ReaderPage): Observable<ReaderPage> {
page.status = Page.State.DOWNLOAD_IMAGE
return fetchImage(page)
.doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) }
.map { page }