Comments
Cette révision appartient à :
Parent
de176ee1f2
révision
ad566d672c
1 fichiers modifiés avec 22 ajouts et 11 suppressions
|
@ -1,7 +1,6 @@
|
||||||
package metainfoFetcher
|
package metainfoFetcher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -199,18 +198,28 @@ func (fetcher *MetainfoFetcher) gotResult(r Result) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fetcher *MetainfoFetcher) removeOldFailures() {
|
func (fetcher *MetainfoFetcher) removeOldFailures() {
|
||||||
// Cooldown is disabled
|
|
||||||
if fetcher.baseFailCooldown < 0 {
|
if fetcher.baseFailCooldown < 0 {
|
||||||
|
// XXX: Cooldown is disabled.
|
||||||
|
// this means that if any attempt to fetch metadata fails
|
||||||
|
// it will never be retried. it also means that
|
||||||
|
// fetcher.failedOperations will keep accumulating torrent IDs
|
||||||
|
// that are never freed.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
maxCd := time.Duration(fetcher.maxFailCooldown) * time.Second
|
max := time.Duration(fetcher.maxFailCooldown) * time.Second
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for id, failTime := range fetcher.failedOperations {
|
for id, failTime := range fetcher.failedOperations {
|
||||||
cdMult := int(math.Pow(2, float64(fetcher.numFails[id]-1)))
|
// double the amount of time waited for ever failed attempt.
|
||||||
cd := time.Duration(cdMult*fetcher.baseFailCooldown) * time.Second
|
// | nfailed | cooldown
|
||||||
if cd > maxCd {
|
// | 1 | 2 * base
|
||||||
cd = maxCd
|
// | 2 | 4 * base
|
||||||
|
// | 3 | 8 * base
|
||||||
|
// integers inside fetcher.numFails are never less than or equal to zero
|
||||||
|
mul := 1 << uint(fetcher.numFails[id]-1)
|
||||||
|
cd := time.Duration(mul * fetcher.baseFailCooldown) * time.Second
|
||||||
|
if cd > max {
|
||||||
|
cd = max
|
||||||
}
|
}
|
||||||
|
|
||||||
if failTime.Add(cd).Before(now) {
|
if failTime.Add(cd).Before(now) {
|
||||||
|
@ -222,9 +231,8 @@ func (fetcher *MetainfoFetcher) removeOldFailures() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fetcher *MetainfoFetcher) fillQueue() {
|
func (fetcher *MetainfoFetcher) fillQueue() {
|
||||||
toFill := fetcher.queueSize - len(fetcher.queue)
|
if left := fetcher.queueSize - len(fetcher.queue); left <= 0 {
|
||||||
|
// queue is already full.
|
||||||
if toFill <= 0 {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,13 +281,15 @@ func (fetcher *MetainfoFetcher) fillQueue() {
|
||||||
|
|
||||||
for _, T := range dbTorrents {
|
for _, T := range dbTorrents {
|
||||||
for _, v := range fetcher.queue {
|
for _, v := range fetcher.queue {
|
||||||
|
// skip torrents that are already being processed.
|
||||||
if v.torrent.ID == T.ID {
|
if v.torrent.ID == T.ID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, ok := fetcher.failedOperations[T.ID]; ok {
|
if _, ok := fetcher.failedOperations[T.ID]; ok {
|
||||||
// do not start new jobs that have recently failed.
|
// do not start new jobs that have recently failed.
|
||||||
// removeOldFailures() takes care of that.
|
// these are on cooldown and will be removed from
|
||||||
|
// fetcher.failedOperations when time is up.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +313,7 @@ func (fetcher *MetainfoFetcher) run() {
|
||||||
for {
|
for {
|
||||||
fetcher.removeOldFailures()
|
fetcher.removeOldFailures()
|
||||||
fetcher.fillQueue()
|
fetcher.fillQueue()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-fetcher.done:
|
case <-fetcher.done:
|
||||||
log.Infof("Got done signal on main loop, leaving...")
|
log.Infof("Got done signal on main loop, leaving...")
|
||||||
|
|
Référencer dans un nouveau ticket