Use Mutex when modifying failedOperations, add exponential cooldown
Just to be safe, won't allow concurrent goroutines to modify the map. The exponential cooldown prevents newer torrents with no seeds blocking older ones with seeds, when there are enough failures that a cooldown event would fill the queue with only failed torrents.
Cette révision appartient à :
Parent
a55cf2a803
révision
3dced6fdf0
2 fichiers modifiés avec 52 ajouts et 17 suppressions
|
@ -1,22 +1,24 @@
|
|||
package config
|
||||
|
||||
type MetainfoFetcherConfig struct {
|
||||
QueueSize int `json:"queue_size"`
|
||||
Timeout int `json:"timeout"`
|
||||
MaxDays int `json:"max_days"`
|
||||
FailCooldown int `json:"fail_cooldown"`
|
||||
WakeUpInterval int `json:"wake_up_interval"`
|
||||
QueueSize int `json:"queue_size"`
|
||||
Timeout int `json:"timeout"`
|
||||
MaxDays int `json:"max_days"`
|
||||
BaseFailCooldown int `json:"base_fail_cooldown"`
|
||||
MaxFailCooldown int `json:"max_fail_cooldown"`
|
||||
WakeUpInterval int `json:"wake_up_interval"`
|
||||
|
||||
UploadRateLimiter int `json:"upload_rate_limiter"`
|
||||
DownloadRateLimiter int `json:"download_rate_limiter"`
|
||||
}
|
||||
|
||||
var DefaultMetainfoFetcherConfig = MetainfoFetcherConfig{
|
||||
QueueSize: 10,
|
||||
Timeout: 120, // 2 min
|
||||
MaxDays: 90,
|
||||
FailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again.
|
||||
WakeUpInterval: 300, // 5 min
|
||||
QueueSize: 10,
|
||||
Timeout: 120, // 2 min
|
||||
MaxDays: 90,
|
||||
BaseFailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again.
|
||||
MaxFailCooldown: 48 * 60 * 60,
|
||||
WakeUpInterval: 300, // 5 min
|
||||
|
||||
UploadRateLimiter: 1024, // kbps
|
||||
DownloadRateLimiter: 1024,
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
serviceBase "github.com/ewhal/nyaa/service"
|
||||
torrentService "github.com/ewhal/nyaa/service/torrent"
|
||||
"golang.org/x/time/rate"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -20,11 +21,14 @@ type MetainfoFetcher struct {
|
|||
queueSize int
|
||||
timeout int
|
||||
maxDays int
|
||||
failCooldown int
|
||||
baseFailCooldown int
|
||||
maxFailCooldown int
|
||||
done chan int
|
||||
queue []*FetchOperation
|
||||
queueMutex sync.Mutex
|
||||
failedOperations map[uint]time.Time
|
||||
numFails map[uint]int
|
||||
failsMutex sync.Mutex
|
||||
wakeUp *time.Ticker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
@ -48,9 +52,11 @@ func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher,
|
|||
queueSize: fetcherConfig.QueueSize,
|
||||
timeout: fetcherConfig.Timeout,
|
||||
maxDays: fetcherConfig.MaxDays,
|
||||
failCooldown: fetcherConfig.FailCooldown,
|
||||
baseFailCooldown: fetcherConfig.BaseFailCooldown,
|
||||
maxFailCooldown: fetcherConfig.MaxFailCooldown,
|
||||
done: make(chan int, 1),
|
||||
failedOperations: make(map[uint]time.Time),
|
||||
numFails: make(map[uint]int),
|
||||
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
|
||||
}
|
||||
|
||||
|
@ -95,6 +101,26 @@ func (fetcher *MetainfoFetcher) removeFromQueue(op *FetchOperation) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) markAsFailed(tID uint) {
|
||||
fetcher.failsMutex.Lock()
|
||||
defer fetcher.failsMutex.Unlock()
|
||||
|
||||
if n, ok := fetcher.numFails[tID]; ok {
|
||||
fetcher.numFails[tID] = n + 1
|
||||
} else {
|
||||
fetcher.numFails[tID] = 1
|
||||
}
|
||||
|
||||
fetcher.failedOperations[tID] = time.Now()
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) removeFromFailed(tID uint) {
|
||||
fetcher.failsMutex.Lock()
|
||||
defer fetcher.failsMutex.Unlock()
|
||||
|
||||
delete(fetcher.failedOperations, tID)
|
||||
}
|
||||
|
||||
func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error {
|
||||
torrentFiles := info.UpvertedFiles()
|
||||
log.Infof("TID %d has %d files.", dbEntry.ID, len(torrentFiles))
|
||||
|
@ -152,7 +178,7 @@ func (fetcher *MetainfoFetcher) gotResult(r Result) {
|
|||
}
|
||||
|
||||
if !updatedSuccessfully {
|
||||
fetcher.failedOperations[r.operation.torrent.ID] = time.Now()
|
||||
fetcher.markAsFailed(r.operation.torrent.ID)
|
||||
}
|
||||
|
||||
fetcher.removeFromQueue(r.operation)
|
||||
|
@ -160,16 +186,23 @@ func (fetcher *MetainfoFetcher) gotResult(r Result) {
|
|||
|
||||
func (fetcher *MetainfoFetcher) removeOldFailures() {
|
||||
// Cooldown is disabled
|
||||
if fetcher.failCooldown < 0 {
|
||||
if fetcher.baseFailCooldown < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
maxCd := time.Duration(fetcher.maxFailCooldown) * time.Second
|
||||
now := time.Now()
|
||||
for id, failTime := range fetcher.failedOperations {
|
||||
if failTime.Add(time.Duration(fetcher.failCooldown) * time.Second).Before(now) {
|
||||
log.Infof("Torrent TID %d gone through cooldown, removing from failures")
|
||||
cdMult := int(math.Pow(2, float64(fetcher.numFails[id] - 1)))
|
||||
cd := time.Duration(cdMult * fetcher.baseFailCooldown) * time.Second
|
||||
if cd > maxCd {
|
||||
cd = maxCd
|
||||
}
|
||||
|
||||
if failTime.Add(cd).Before(now) {
|
||||
log.Infof("Torrent TID %d gone through cooldown, removing from failures", id)
|
||||
// Deleting keys inside a loop seems to be safe.
|
||||
delete(fetcher.failedOperations, id)
|
||||
fetcher.removeFromFailed(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Référencer dans un nouveau ticket