@ -30,6 +30,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
@ -61,8 +62,10 @@ import org.koitharu.kotatsu.core.util.ext.getWorkInputData
import org.koitharu.kotatsu.core.util.ext.getWorkSpec
import org.koitharu.kotatsu.core.util.ext.ifNullOrEmpty
import org.koitharu.kotatsu.core.util.ext.printStackTraceDebug
import org.koitharu.kotatsu.core.util.ext.withTicker
import org.koitharu.kotatsu.core.util.ext.writeAllCancellable
import org.koitharu.kotatsu.core.util.progress.TimeLeftEstimator
import org.koitharu.kotatsu.core.util.progress.RealtimeEtaEstimator
import org.koitharu.kotatsu.download.domain.DownloadProgress
import org.koitharu.kotatsu.download.domain.DownloadState
import org.koitharu.kotatsu.local.data.LocalMangaRepository
import org.koitharu.kotatsu.local.data.LocalStorageChanges
@ -70,6 +73,7 @@ import org.koitharu.kotatsu.local.data.PagesCache
import org.koitharu.kotatsu.local.data.TempFileFilter
import org.koitharu.kotatsu.local.data.input.LocalMangaInput
import org.koitharu.kotatsu.local.data.output.LocalMangaOutput
import org.koitharu.kotatsu.local.domain.MangaLock
import org.koitharu.kotatsu.local.domain.model.LocalManga
import org.koitharu.kotatsu.parsers.exception.TooManyRequestExceptions
import org.koitharu.kotatsu.parsers.model.Manga
@ -91,6 +95,7 @@ class DownloadWorker @AssistedInject constructor(
@MangaHttpClient private val okHttp : OkHttpClient ,
private val cache : PagesCache ,
private val localMangaRepository : LocalMangaRepository ,
private val mangaLock : MangaLock ,
private val mangaDataRepository : MangaDataRepository ,
private val mangaRepositoryFactory : MangaRepository . Factory ,
private val settings : AppSettings ,
@ -108,7 +113,7 @@ class DownloadWorker @AssistedInject constructor(
private val currentState : DownloadState
get ( ) = checkNotNull ( lastPublishedState )
private val timeLeftEstimator = TimeLeft Estimator( )
private val etaEstimator = RealtimeEta Estimator( )
private val notificationThrottler = Throttler ( 400 )
override suspend fun doWork ( ) : Result {
@ -130,17 +135,16 @@ class DownloadWorker @AssistedInject constructor(
notificationManager . notify ( id . hashCode ( ) , notification )
}
Result . failure (
currentState . copy ( eta = - 1L ). toWorkData ( ) ,
currentState . copy ( eta = - 1L , isStuck = false ). toWorkData ( ) ,
)
} catch ( e : IOException ) {
e . printStackTraceDebug ( )
Result . retry ( )
} catch ( e : Exception ) {
e . printStackTraceDebug ( )
Result . failure (
currentState . copy (
error = e . getDisplayMessage ( applicationContext . resources ) ,
error = e ,
errorMessage = e . getDisplayMessage ( applicationContext . resources ) ,
eta = - 1L ,
isStuck = false ,
) . toWorkData ( ) ,
)
} finally {
@ -169,7 +173,7 @@ class DownloadWorker @AssistedInject constructor(
var manga = subject
val chaptersToSkip = excludedIds . toMutableSet ( )
val pausingReceiver = PausingReceiver ( id , PausingHandle . current ( ) )
withManga Lock( manga ) {
mangaLock. withLock( manga ) {
ContextCompat . registerReceiver (
applicationContext ,
pausingReceiver ,
@ -229,15 +233,23 @@ class DownloadWorker @AssistedInject constructor(
}
}
}
} . collect {
publishState (
currentState . copy (
} . map {
DownloadProgress (
totalChapters = chapters . size ,
currentChapter = chapterIndex ,
totalPages = pages . size ,
currentPage = pageCounter . incrementAndGet ( ) ,
currentPage = pageCounter . getAndIncrement ( ) ,
)
} . withTicker ( 2L , TimeUnit . SECONDS ) . collect { progress ->
publishState (
currentState . copy (
totalChapters = progress . totalChapters ,
currentChapter = progress . currentChapter ,
totalPages = progress . totalPages ,
currentPage = progress . currentPage ,
isIndeterminate = false ,
eta = timeLeftEstimator . getEta ( ) ,
eta = etaEstimator . getEta ( ) ,
isStuck = etaEstimator . isStuck ( ) ,
) ,
)
}
@ -248,15 +260,20 @@ class DownloadWorker @AssistedInject constructor(
}
publishState ( currentState . copy ( downloadedChapters = currentState . downloadedChapters + 1 ) )
}
publishState ( currentState . copy ( isIndeterminate = true , eta = - 1L ))
publishState ( currentState . copy ( isIndeterminate = true , eta = - 1L , isStuck = false ))
output . mergeWithExisting ( )
output . finish ( )
val localManga = LocalMangaInput . of ( output . rootFile ) . getManga ( )
localStorageChanges . emit ( localManga )
publishState ( currentState . copy ( localManga = localManga , eta = - 1L ))
publishState ( currentState . copy ( localManga = localManga , eta = - 1L , isStuck = false ))
} catch ( e : Exception ) {
if ( e !is CancellationException ) {
publishState ( currentState . copy ( error = e . getDisplayMessage ( applicationContext . resources ) ) )
publishState (
currentState . copy (
error = e ,
errorMessage = e . getDisplayMessage ( applicationContext . resources ) ,
) ,
)
}
throw e
} finally {
@ -281,12 +298,19 @@ class DownloadWorker @AssistedInject constructor(
try {
return block ( )
} catch ( e : IOException ) {
if ( countDown <= 0 ) {
val retryDelay = if ( e is TooManyRequestExceptions ) {
e . getRetryDelay ( )
} else {
DOWNLOAD _ERROR _DELAY
}
if ( countDown <= 0 || retryDelay < 0 || retryDelay > MAX _RETRY _DELAY ) {
publishState (
currentState . copy (
isPaused = true ,
error = e . getDisplayMessage ( applicationContext . resources ) ,
error = e ,
errorMessage = e . getDisplayMessage ( applicationContext . resources ) ,
eta = - 1L ,
isStuck = false ,
) ,
)
countDown = MAX _FAILSAFE _ATTEMPTS
@ -298,15 +322,10 @@ class DownloadWorker @AssistedInject constructor(
return null
}
} finally {
publishState ( currentState . copy ( isPaused = false , error = null ))
publishState ( currentState . copy ( isPaused = false , error = null , errorMessage = null ))
}
} else {
countDown --
val retryDelay = if ( e is TooManyRequestExceptions ) {
e . retryAfter + DOWNLOAD _ERROR _DELAY
} else {
DOWNLOAD _ERROR _DELAY
}
delay ( retryDelay )
}
}
@ -316,7 +335,7 @@ class DownloadWorker @AssistedInject constructor(
private suspend fun checkIsPaused ( ) {
val pausingHandle = PausingHandle . current ( )
if ( pausingHandle . isPaused ) {
publishState ( currentState . copy ( isPaused = true , eta = - 1L ))
publishState ( currentState . copy ( isPaused = true , eta = - 1L , isStuck = false ))
try {
pausingHandle . awaitResumed ( )
} finally {
@ -354,9 +373,9 @@ class DownloadWorker @AssistedInject constructor(
val previousState = currentState
lastPublishedState = state
if ( previousState . isParticularProgress && state . isParticularProgress ) {
timeLeftEstimator. tick ( state . progress , state . max )
etaEstimator. onProgressChanged ( state . progress , state . max )
} else {
timeLeftEstimator. emptyTick ( )
etaEstimator. reset ( )
notificationThrottler . reset ( )
}
val notification = notificationFactory . create ( state )
@ -399,13 +418,6 @@ class DownloadWorker @AssistedInject constructor(
return result
}
private suspend inline fun < T > withMangaLock ( manga : Manga , block : ( ) -> T ) = try {
localMangaRepository . lockManga ( manga . id )
block ( )
} finally {
localMangaRepository . unlockManga ( manga . id )
}
@Reusable
class Scheduler @Inject constructor (
@ApplicationContext private val context : Context ,
@ -458,15 +470,21 @@ class DownloadWorker @AssistedInject constructor(
workManager . cancelAllWorkByTag ( TAG ) . await ( )
}
fun pause ( id : UUID ) {
val intent = PausingReceiver . getPauseIntent ( context , id )
context . sendBroadcast ( intent )
}
fun pause ( id : UUID ) = context . sendBroadcast (
PausingReceiver . getPauseIntent ( context , id ) ,
)
fun resume ( id : UUID , skipError : Boolean ) {
val intent = PausingReceiver . getResumeIntent ( context , id , skipError )
context . sendBroadcast ( intent )
}
fun resume ( id : UUID ) = context . sendBroadcast (
PausingReceiver . getResumeIntent ( context , id ) ,
)
fun skip ( id : UUID ) = context . sendBroadcast (
PausingReceiver . getSkipIntent ( context , id ) ,
)
fun skipAll ( id : UUID ) = context . sendBroadcast (
PausingReceiver . getSkipAllIntent ( context , id ) ,
)
suspend fun delete ( id : UUID ) {
workManager . deleteWork ( id )
@ -526,7 +544,8 @@ class DownloadWorker @AssistedInject constructor(
const val MAX _FAILSAFE _ATTEMPTS = 2
const val MAX _PAGES _PARALLELISM = 4
const val DOWNLOAD _ERROR _DELAY = 500L
const val DOWNLOAD _ERROR _DELAY = 2 _000L
const val MAX _RETRY _DELAY = 7 _200 _000L // 2 hours
const val SLOWDOWN _DELAY = 200L
const val MANGA _ID = " manga_id "
const val CHAPTERS _IDS = " chapters "