Add fail cooldown
After a set cooldown, torrents that failed to be fetch will be able to be fetched again. Set FailCooldown to -1 to disable.
Cette révision appartient à :
Parent
83de593a65
révision
cae0026a67
3 fichiers modifiés avec 38 ajouts et 16 suppressions
|
@ -4,6 +4,7 @@ type MetainfoFetcherConfig struct {
|
|||
QueueSize int `json:"queue_size"`
|
||||
Timeout int `json:"timeout"`
|
||||
MaxDays int `json:"max_days"`
|
||||
FailCooldown int `json:"fail_cooldown"`
|
||||
WakeUpInterval int `json:"wake_up_interval"`
|
||||
|
||||
UploadRateLimiter int `json:"upload_rate_limiter"`
|
||||
|
@ -14,9 +15,10 @@ var DefaultMetainfoFetcherConfig = MetainfoFetcherConfig{
|
|||
QueueSize: 10,
|
||||
Timeout: 120, // 2 min
|
||||
MaxDays: 90,
|
||||
FailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again.
|
||||
WakeUpInterval: 300, // 5 min
|
||||
|
||||
UploadRateLimiter: 1024,
|
||||
UploadRateLimiter: 1024, // kbps
|
||||
DownloadRateLimiter: 1024,
|
||||
}
|
||||
|
||||
|
|
|
@ -20,10 +20,11 @@ type MetainfoFetcher struct {
|
|||
queueSize int
|
||||
timeout int
|
||||
maxDays int
|
||||
failCooldown int
|
||||
done chan int
|
||||
queue []*FetchOperation
|
||||
queueMutex sync.Mutex
|
||||
failedOperations map[uint]struct{}
|
||||
failedOperations map[uint]time.Time
|
||||
wakeUp *time.Ticker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
@ -47,8 +48,9 @@ func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher,
|
|||
queueSize: fetcherConfig.QueueSize,
|
||||
timeout: fetcherConfig.Timeout,
|
||||
maxDays: fetcherConfig.MaxDays,
|
||||
failCooldown: fetcherConfig.FailCooldown,
|
||||
done: make(chan int, 1),
|
||||
failedOperations: make(map[uint]struct{}),
|
||||
failedOperations: make(map[uint]time.Time),
|
||||
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
|
||||
}
|
||||
|
||||
|
@ -150,12 +152,28 @@ func (fetcher *MetainfoFetcher) gotResult(r Result) {
|
|||
}
|
||||
|
||||
if !updatedSuccessfully {
|
||||
fetcher.failedOperations[r.operation.torrent.ID] = struct{}{}
|
||||
fetcher.failedOperations[r.operation.torrent.ID] = time.Now()
|
||||
}
|
||||
|
||||
fetcher.removeFromQueue(r.operation)
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) removeOldFailures() {
|
||||
// Cooldown is disabled
|
||||
if fetcher.failCooldown < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for id, failTime := range fetcher.failedOperations {
|
||||
if failTime.Add(time.Duration(fetcher.failCooldown) * time.Second).Before(now) {
|
||||
log.Infof("Torrent TID %d gone through cooldown, removing from failures")
|
||||
// Deleting keys inside a loop seems to be safe.
|
||||
delete(fetcher.failedOperations, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) fillQueue() {
|
||||
toFill := fetcher.queueSize - len(fetcher.queue)
|
||||
|
||||
|
@ -180,15 +198,16 @@ func (fetcher *MetainfoFetcher) fillQueue() {
|
|||
}
|
||||
dbTorrents, count, err := torrentService.GetTorrents(params, fetcher.queueSize, 0)
|
||||
|
||||
if count == 0 {
|
||||
log.Infof("No torrents for filesize update")
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Infof("Failed to get torrents for metainfo updating")
|
||||
return
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
log.Infof("No torrents for metainfo update")
|
||||
return
|
||||
}
|
||||
|
||||
for _, T := range dbTorrents {
|
||||
if fetcher.isFetchingOrFailed(T) {
|
||||
|
@ -215,15 +234,15 @@ func (fetcher *MetainfoFetcher) run() {
|
|||
done := 0
|
||||
fetcher.fillQueue()
|
||||
for done == 0 {
|
||||
fetcher.removeOldFailures()
|
||||
fetcher.fillQueue()
|
||||
select {
|
||||
case done = <-fetcher.done:
|
||||
break
|
||||
case result = <-fetcher.results:
|
||||
fetcher.gotResult(result)
|
||||
fetcher.fillQueue()
|
||||
break
|
||||
case <-fetcher.wakeUp.C:
|
||||
fetcher.fillQueue()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -245,6 +264,7 @@ func (fetcher *MetainfoFetcher) Close() error {
|
|||
}
|
||||
|
||||
fetcher.done <- 1
|
||||
fetcher.torrentClient.Close()
|
||||
log.Infof("Send done signal to everyone, waiting...")
|
||||
fetcher.wg.Wait()
|
||||
return nil
|
||||
|
|
|
@ -41,20 +41,20 @@ func (op *FetchOperation) Start(out chan Result) {
|
|||
out <- Result{op, err, nil}
|
||||
return
|
||||
}
|
||||
defer downloadingTorrent.Drop()
|
||||
|
||||
timeoutTicker := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout))
|
||||
timeoutTimer := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout))
|
||||
defer timeoutTimer.Stop()
|
||||
select {
|
||||
case <-downloadingTorrent.GotInfo():
|
||||
downloadingTorrent.Drop()
|
||||
out <- Result{op, nil, downloadingTorrent.Info()}
|
||||
break
|
||||
case <-timeoutTicker.C:
|
||||
downloadingTorrent.Drop()
|
||||
case <-timeoutTimer.C:
|
||||
out <- Result{op, errors.New("Timeout"), nil}
|
||||
break
|
||||
case <-op.done:
|
||||
downloadingTorrent.Drop()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Référencer dans un nouveau ticket