diff --git a/config/default_config.yml b/config/default_config.yml index 32fdbfd3..bc67083e 100644 --- a/config/default_config.yml +++ b/config/default_config.yml @@ -69,8 +69,9 @@ metainfo_fetcher: base_fail_cooldown: 1800 max_fail_cooldown: 172800 wake_up_interval: 300 - upload_rate_limiter: 1024 - download_rate_limiter: 1024 +# limits are in KiB, zero means no limit + upload_rate_limit: 1024 + download_rate_limit: 1024 fetch_new_torrents_only: true i18n: # Default configuration for translation directory diff --git a/config/types.go b/config/types.go index 826ef961..963b52f7 100644 --- a/config/types.go +++ b/config/types.go @@ -136,8 +136,8 @@ type MetainfoFetcherConfig struct { MaxFailCooldown int `json:"max_fail_cooldown" yaml:"max_fail_cooldown,omitempty"` WakeUpInterval int `json:"wake_up_interval" yaml:"wake_up_interval,omitempty"` - UploadRateLimiter int `json:"upload_rate_limiter" yaml:"upload_rate_limiter,omitempty"` - DownloadRateLimiter int `json:"download_rate_limiter" yaml:"download_rate_limiter,omitempty"` + UploadRateLimitKiB int `json:"upload_rate_limit" yaml:"upload_rate_limit,omitempty"` + DownloadRateLimitKiB int `json:"download_rate_limit" yaml:"download_rate_limit,omitempty"` FetchNewTorrentsOnly bool `json:"fetch_new_torrents_only" yaml:"fetch_new_torrents_only,omitempty"` } diff --git a/service/torrent/metainfoFetcher/metainfo_fetcher.go b/service/torrent/metainfoFetcher/metainfo_fetcher.go index 8164902b..9334d9cd 100644 --- a/service/torrent/metainfoFetcher/metainfo_fetcher.go +++ b/service/torrent/metainfoFetcher/metainfo_fetcher.go @@ -26,7 +26,7 @@ type MetainfoFetcher struct { baseFailCooldown int maxFailCooldown int newTorrentsOnly bool - done chan int + done chan struct{} queue []*FetchOperation queueMutex sync.Mutex failedOperations map[uint]time.Time @@ -37,20 +37,32 @@ type MetainfoFetcher struct { } // New : Creates a MetainfoFetcher struct -func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher, err error) { +func New(fetcherConfig *config.MetainfoFetcherConfig) (*MetainfoFetcher, error) { clientConfig := torrent.Config{} + // Well, it seems this is the right way to convert speed -> rate.Limiter // https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go - if fetcherConfig.UploadRateLimiter != -1 { - clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.UploadRateLimiter*1024), 256<<10) + const uploadBurst = 0x40000 // 256K + const downloadBurst = 0x100000 // 1M + uploadLimit := fetcherConfig.UploadRateLimitKiB*1024 + downloadLimit := fetcherConfig.DownloadRateLimitKiB*1024 + if uploadLimit > 0 { + limit := rate.Limit(uploadLimit) + limiter := rate.NewLimiter(limit, uploadBurst) + clientConfig.UploadRateLimiter = limiter } - if fetcherConfig.DownloadRateLimiter != -1 { - clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.DownloadRateLimiter*1024), 1<<20) + if downloadLimit > 0 { + limit := rate.Limit(downloadLimit) + limiter := rate.NewLimiter(limit, downloadBurst) + clientConfig.DownloadRateLimiter = limiter } client, err := torrent.NewClient(&clientConfig) + if err != nil { + return nil, err + } - fetcher = &MetainfoFetcher{ + fetcher := &MetainfoFetcher{ torrentClient: client, results: make(chan Result, fetcherConfig.QueueSize), queueSize: fetcherConfig.QueueSize, @@ -59,13 +71,13 @@ func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher, newTorrentsOnly: fetcherConfig.FetchNewTorrentsOnly, baseFailCooldown: fetcherConfig.BaseFailCooldown, maxFailCooldown: fetcherConfig.MaxFailCooldown, - done: make(chan int, 1), + done: make(chan struct{}, 1), failedOperations: make(map[uint]time.Time), numFails: make(map[uint]int), wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)), } - return + return fetcher, nil } func (fetcher *MetainfoFetcher) isFetchingOrFailed(t model.Torrent) bool { @@ -291,20 +303,17 @@ func (fetcher *MetainfoFetcher) run() { defer fetcher.wg.Done() - done := 0 - for done == 0 { + for { fetcher.removeOldFailures() fetcher.fillQueue() select { - case done = <-fetcher.done: + case <-fetcher.done: log.Infof("Got done signal on main loop, leaving...") - break + return case result = <-fetcher.results: fetcher.gotResult(result) - break case <-fetcher.wakeUp.C: log.Infof("Got wake up signal...") - break } } } @@ -323,10 +332,10 @@ func (fetcher *MetainfoFetcher) Close() error { // Send the done event to every Operation for _, op := range fetcher.queue { - op.done <- 1 + op.done <- struct{}{} } - fetcher.done <- 1 + fetcher.done <- struct{}{} fetcher.torrentClient.Close() log.Infof("Send done signal to everyone, waiting...") fetcher.wg.Wait() diff --git a/service/torrent/metainfoFetcher/operation.go b/service/torrent/metainfoFetcher/operation.go index 225ef7af..c163e299 100644 --- a/service/torrent/metainfoFetcher/operation.go +++ b/service/torrent/metainfoFetcher/operation.go @@ -15,7 +15,7 @@ import ( type FetchOperation struct { fetcher *MetainfoFetcher torrent model.Torrent - done chan int + done chan struct{} } // Result struct @@ -30,7 +30,7 @@ func NewFetchOperation(fetcher *MetainfoFetcher, dbEntry model.Torrent) (op *Fet op = &FetchOperation{ fetcher: fetcher, torrent: dbEntry, - done: make(chan int, 1), + done: make(chan struct{}, 1), } return } @@ -52,11 +52,8 @@ func (op *FetchOperation) Start(out chan Result) { select { case <-downloadingTorrent.GotInfo(): out <- Result{op, nil, downloadingTorrent.Info()} - break case <-timeoutTimer.C: out <- Result{op, errors.New("Timeout"), nil} - break case <-op.done: - break } }