diff --git a/config/metainfoFetcher.go b/config/metainfoFetcher.go index eceb37a1..8afe58a5 100644 --- a/config/metainfoFetcher.go +++ b/config/metainfoFetcher.go @@ -4,6 +4,7 @@ type MetainfoFetcherConfig struct { QueueSize int `json:"queue_size"` Timeout int `json:"timeout"` MaxDays int `json:"max_days"` + FailCooldown int `json:"fail_cooldown"` WakeUpInterval int `json:"wake_up_interval"` UploadRateLimiter int `json:"upload_rate_limiter"` @@ -14,9 +15,10 @@ var DefaultMetainfoFetcherConfig = MetainfoFetcherConfig{ QueueSize: 10, Timeout: 120, // 2 min MaxDays: 90, + FailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again. WakeUpInterval: 300, // 5 min - UploadRateLimiter: 1024, + UploadRateLimiter: 1024, // kbps DownloadRateLimiter: 1024, } diff --git a/service/torrent/metainfoFetcher/metainfoFetcher.go b/service/torrent/metainfoFetcher/metainfoFetcher.go index d84b9883..807bc381 100644 --- a/service/torrent/metainfoFetcher/metainfoFetcher.go +++ b/service/torrent/metainfoFetcher/metainfoFetcher.go @@ -20,10 +20,11 @@ type MetainfoFetcher struct { queueSize int timeout int maxDays int + failCooldown int done chan int queue []*FetchOperation queueMutex sync.Mutex - failedOperations map[uint]struct{} + failedOperations map[uint]time.Time wakeUp *time.Ticker wg sync.WaitGroup } @@ -47,8 +48,9 @@ func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher, queueSize: fetcherConfig.QueueSize, timeout: fetcherConfig.Timeout, maxDays: fetcherConfig.MaxDays, + failCooldown: fetcherConfig.FailCooldown, done: make(chan int, 1), - failedOperations: make(map[uint]struct{}), + failedOperations: make(map[uint]time.Time), wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)), } @@ -150,12 +152,28 @@ func (fetcher *MetainfoFetcher) gotResult(r Result) { } if !updatedSuccessfully { - fetcher.failedOperations[r.operation.torrent.ID] = struct{}{} + fetcher.failedOperations[r.operation.torrent.ID] = time.Now() } fetcher.removeFromQueue(r.operation) } +func (fetcher *MetainfoFetcher) removeOldFailures() { + // Cooldown is disabled + if fetcher.failCooldown < 0 { + return + } + + now := time.Now() + for id, failTime := range fetcher.failedOperations { + if failTime.Add(time.Duration(fetcher.failCooldown) * time.Second).Before(now) { + log.Infof("Torrent TID %d gone through cooldown, removing from failures") + // Deleting keys inside a loop seems to be safe. + delete(fetcher.failedOperations, id) + } + } +} + func (fetcher *MetainfoFetcher) fillQueue() { toFill := fetcher.queueSize - len(fetcher.queue) @@ -180,15 +198,16 @@ func (fetcher *MetainfoFetcher) fillQueue() { } dbTorrents, count, err := torrentService.GetTorrents(params, fetcher.queueSize, 0) + if count == 0 { + log.Infof("No torrents for filesize update") + return + } + if err != nil { log.Infof("Failed to get torrents for metainfo updating") return } - - if count == 0 { - log.Infof("No torrents for metainfo update") - return - } + for _, T := range dbTorrents { if fetcher.isFetchingOrFailed(T) { @@ -215,15 +234,15 @@ func (fetcher *MetainfoFetcher) run() { done := 0 fetcher.fillQueue() for done == 0 { + fetcher.removeOldFailures() + fetcher.fillQueue() select { case done = <-fetcher.done: break case result = <-fetcher.results: fetcher.gotResult(result) - fetcher.fillQueue() break case <-fetcher.wakeUp.C: - fetcher.fillQueue() break } } @@ -245,6 +264,7 @@ func (fetcher *MetainfoFetcher) Close() error { } fetcher.done <- 1 + fetcher.torrentClient.Close() log.Infof("Send done signal to everyone, waiting...") fetcher.wg.Wait() return nil diff --git a/service/torrent/metainfoFetcher/operation.go b/service/torrent/metainfoFetcher/operation.go index 6c8e6905..92eada2e 100644 --- a/service/torrent/metainfoFetcher/operation.go +++ b/service/torrent/metainfoFetcher/operation.go @@ -41,20 +41,20 @@ func (op *FetchOperation) Start(out chan Result) { out <- Result{op, err, nil} return } + defer downloadingTorrent.Drop() - timeoutTicker := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout)) + timeoutTimer := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout)) + defer timeoutTimer.Stop() select { case <-downloadingTorrent.GotInfo(): - downloadingTorrent.Drop() out <- Result{op, nil, downloadingTorrent.Info()} break - case <-timeoutTicker.C: - downloadingTorrent.Drop() + case <-timeoutTimer.C: out <- Result{op, errors.New("Timeout"), nil} break case <-op.done: - downloadingTorrent.Drop() break } } +