Albirew/nyaa-pantsu
Archivé
1
0
Bifurcation 0
Ce dépôt a été archivé le 2022-05-07. Vous pouvez voir ses fichiers ou le cloner, mais pas ouvrir de ticket ou de demandes d'ajout, ni soumettre de changements.
nyaa-pantsu/service/torrent/metainfoFetcher/metainfo_fetcher.go

349 lignes
9,2 Kio
Go
Brut Vue normale Historique

2017-05-15 11:32:28 +02:00
package metainfoFetcher
2017-05-13 19:58:48 +02:00
import (
"math"
"sync"
"time"
2017-05-17 07:58:40 +02:00
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/db"
"github.com/NyaaPantsu/nyaa/model"
serviceBase "github.com/NyaaPantsu/nyaa/service"
torrentService "github.com/NyaaPantsu/nyaa/service/torrent"
"github.com/NyaaPantsu/nyaa/util/log"
2017-05-13 19:58:48 +02:00
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"golang.org/x/time/rate"
2017-05-13 19:58:48 +02:00
)
// MetainfoFetcher Struct
2017-05-14 13:20:34 +02:00
type MetainfoFetcher struct {
2017-05-13 19:58:48 +02:00
torrentClient *torrent.Client
results chan Result
queueSize int
timeout int
maxDays int
baseFailCooldown int
maxFailCooldown int
newTorrentsOnly bool
2017-06-02 12:03:23 +02:00
done chan struct{}
2017-05-13 19:58:48 +02:00
queue []*FetchOperation
queueMutex sync.Mutex
failedOperations map[uint]time.Time
numFails map[uint]int
failsMutex sync.Mutex
2017-05-13 19:58:48 +02:00
wakeUp *time.Ticker
2017-05-13 21:07:39 +02:00
wg sync.WaitGroup
2017-05-13 19:58:48 +02:00
}
// New : Creates a MetainfoFetcher struct
2017-06-02 12:03:23 +02:00
func New(fetcherConfig *config.MetainfoFetcherConfig) (*MetainfoFetcher, error) {
clientConfig := torrent.Config{}
2017-06-02 12:03:23 +02:00
// Well, it seems this is the right way to convert speed -> rate.Limiter
// https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go
2017-06-02 12:03:23 +02:00
const uploadBurst = 0x40000 // 256K
const downloadBurst = 0x100000 // 1M
uploadLimit := fetcherConfig.UploadRateLimitKiB*1024
downloadLimit := fetcherConfig.DownloadRateLimitKiB*1024
if uploadLimit > 0 {
limit := rate.Limit(uploadLimit)
limiter := rate.NewLimiter(limit, uploadBurst)
clientConfig.UploadRateLimiter = limiter
}
2017-06-02 12:03:23 +02:00
if downloadLimit > 0 {
limit := rate.Limit(downloadLimit)
limiter := rate.NewLimiter(limit, downloadBurst)
clientConfig.DownloadRateLimiter = limiter
}
client, err := torrent.NewClient(&clientConfig)
2017-06-02 12:03:23 +02:00
if err != nil {
return nil, err
}
2017-06-02 12:03:23 +02:00
fetcher := &MetainfoFetcher{
2017-05-13 19:58:48 +02:00
torrentClient: client,
results: make(chan Result, fetcherConfig.QueueSize),
2017-05-13 19:58:48 +02:00
queueSize: fetcherConfig.QueueSize,
timeout: fetcherConfig.Timeout,
maxDays: fetcherConfig.MaxDays,
newTorrentsOnly: fetcherConfig.FetchNewTorrentsOnly,
baseFailCooldown: fetcherConfig.BaseFailCooldown,
maxFailCooldown: fetcherConfig.MaxFailCooldown,
2017-06-02 12:03:23 +02:00
done: make(chan struct{}, 1),
failedOperations: make(map[uint]time.Time),
numFails: make(map[uint]int),
2017-05-13 19:58:48 +02:00
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
}
2017-06-02 12:03:23 +02:00
return fetcher, nil
2017-05-13 19:58:48 +02:00
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) isFetchingOrFailed(t model.Torrent) bool {
2017-05-13 19:58:48 +02:00
for _, op := range fetcher.queue {
if op.torrent.ID == t.ID {
return true
}
}
_, ok := fetcher.failedOperations[t.ID]
return ok
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) addToQueue(op *FetchOperation) bool {
2017-05-13 19:58:48 +02:00
fetcher.queueMutex.Lock()
defer fetcher.queueMutex.Unlock()
2017-05-15 11:32:28 +02:00
if len(fetcher.queue)+1 > fetcher.queueSize {
2017-05-13 19:58:48 +02:00
return false
}
fetcher.queue = append(fetcher.queue, op)
return true
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) removeFromQueue(op *FetchOperation) bool {
2017-05-13 19:58:48 +02:00
fetcher.queueMutex.Lock()
defer fetcher.queueMutex.Unlock()
for i, queueOP := range fetcher.queue {
if queueOP == op {
fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...)
return true
}
}
return false
}
func (fetcher *MetainfoFetcher) markAsFailed(tID uint) {
fetcher.failsMutex.Lock()
defer fetcher.failsMutex.Unlock()
if n, ok := fetcher.numFails[tID]; ok {
fetcher.numFails[tID] = n + 1
} else {
fetcher.numFails[tID] = 1
}
fetcher.failedOperations[tID] = time.Now()
}
func (fetcher *MetainfoFetcher) removeFromFailed(tID uint) {
fetcher.failsMutex.Lock()
defer fetcher.failsMutex.Unlock()
delete(fetcher.failedOperations, tID)
}
func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error {
torrentFiles := info.UpvertedFiles()
log.Infof("TID %d has %d files.", dbEntry.ID, len(torrentFiles))
for _, file := range torrentFiles {
var path []string
if file.Path != nil {
path = file.Path
} else {
// If it's nil, use the torrent name (info.Name) as the path (single-file torrent)
path = append(path, info.Name)
}
// Can't read FileList from the GetTorrents output, rely on the unique_index
// to ensure no files are duplicated.
log.Infof("Adding file %s to filelist of TID %d", file.DisplayPath(info), dbEntry.ID)
dbFile := model.File{
TorrentID: dbEntry.ID,
2017-05-15 11:32:28 +02:00
Filesize: file.Length,
}
err := dbFile.SetPath(path)
if err != nil {
return err
}
err = db.ORM.Create(&dbFile).Error
if err != nil {
return err
}
}
return nil
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) gotResult(r Result) {
2017-05-13 19:58:48 +02:00
updatedSuccessfully := false
if r.err != nil {
log.Infof("Failed to get torrent metainfo (TID: %d), err %v", r.operation.torrent.ID, r.err)
} else if r.info.TotalLength() == 0 {
2017-05-13 19:58:48 +02:00
log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID)
} else {
lengthOK := true
if r.operation.torrent.Filesize != r.info.TotalLength() {
log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.TotalLength(), r.operation.torrent.ID)
r.operation.torrent.Filesize = r.info.TotalLength()
_, err := torrentService.UpdateTorrent(r.operation.torrent)
if err != nil {
log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID)
lengthOK = false
}
2017-05-13 19:58:48 +02:00
}
filelistOK := true
// Create the file list, if it's missing.
if len(r.operation.torrent.FileList) == 0 {
err := updateFileList(r.operation.torrent, r.info)
if err != nil {
log.Infof("Failed to update file list of TID %d", r.operation.torrent.ID)
filelistOK = false
}
}
updatedSuccessfully = lengthOK && filelistOK
2017-05-13 19:58:48 +02:00
}
if !updatedSuccessfully {
fetcher.markAsFailed(r.operation.torrent.ID)
2017-05-13 19:58:48 +02:00
}
fetcher.removeFromQueue(r.operation)
}
func (fetcher *MetainfoFetcher) removeOldFailures() {
// Cooldown is disabled
if fetcher.baseFailCooldown < 0 {
return
}
maxCd := time.Duration(fetcher.maxFailCooldown) * time.Second
now := time.Now()
for id, failTime := range fetcher.failedOperations {
2017-05-15 11:32:28 +02:00
cdMult := int(math.Pow(2, float64(fetcher.numFails[id]-1)))
cd := time.Duration(cdMult*fetcher.baseFailCooldown) * time.Second
if cd > maxCd {
cd = maxCd
}
if failTime.Add(cd).Before(now) {
log.Infof("Torrent TID %d gone through cooldown, removing from failures", id)
// Deleting keys inside a loop seems to be safe.
fetcher.removeFromFailed(id)
}
}
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) fillQueue() {
2017-05-13 19:58:48 +02:00
toFill := fetcher.queueSize - len(fetcher.queue)
if toFill <= 0 {
return
}
2017-05-15 11:32:28 +02:00
oldest := time.Now().Add(0 - (time.Hour * time.Duration(24*fetcher.maxDays)))
excludedIDS := make([]uint, 0, len(fetcher.failedOperations))
2017-05-24 09:15:07 +02:00
for id := range fetcher.failedOperations {
excludedIDS = append(excludedIDS, id)
}
2017-05-24 09:11:13 +02:00
tFiles := config.Conf.Models.FilesTableName
tTorrents := config.Conf.Models.TorrentsTableName
// Select the torrents with no filesize, or without any rows with torrent_id in the files table...
2017-05-24 09:11:13 +02:00
queryString := "((filesize IS NULL OR filesize = 0) OR (" + tTorrents + ".torrent_id NOT " +
"IN (SELECT " + tFiles + ".torrent_id FROM " + tFiles + " WHERE " + tFiles +
".torrent_id = " + tTorrents + ".torrent_id)))"
var whereParamsArgs []interface{}
// that are newer than maxDays...
queryString += " AND date > ? "
whereParamsArgs = append(whereParamsArgs, oldest)
// that didn't fail recently...
if len(excludedIDS) > 0 {
queryString += " AND torrent_id NOT IN (?) "
whereParamsArgs = append(whereParamsArgs, excludedIDS)
}
// and, if true, that aren't from the old Nyaa database
if fetcher.newTorrentsOnly {
queryString += " AND torrent_id > ? "
whereParamsArgs = append(whereParamsArgs, config.Conf.Models.LastOldTorrentID)
}
params := serviceBase.CreateWhereParams(queryString, whereParamsArgs...)
dbTorrents, err := torrentService.GetTorrentsOrderByNoCount(&params, "", fetcher.queueSize, 0)
2017-05-13 19:58:48 +02:00
if len(dbTorrents) == 0 {
log.Infof("No torrents for filesize update")
2017-05-13 19:58:48 +02:00
return
}
if err != nil {
log.Infof("Failed to get torrents for metainfo updating")
2017-05-13 19:58:48 +02:00
return
}
for _, T := range dbTorrents {
if fetcher.isFetchingOrFailed(T) {
continue
}
operation := NewFetchOperation(fetcher, T)
if fetcher.addToQueue(operation) {
log.Infof("Added TID %d for filesize fetching", T.ID)
2017-05-13 21:07:39 +02:00
fetcher.wg.Add(1)
2017-05-13 19:58:48 +02:00
go operation.Start(fetcher.results)
} else {
break
}
}
}
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) run() {
2017-05-13 19:58:48 +02:00
var result Result
2017-05-13 21:07:39 +02:00
defer fetcher.wg.Done()
2017-06-02 12:03:23 +02:00
for {
fetcher.removeOldFailures()
fetcher.fillQueue()
2017-05-13 19:58:48 +02:00
select {
2017-06-02 12:03:23 +02:00
case <-fetcher.done:
log.Infof("Got done signal on main loop, leaving...")
2017-06-02 12:03:23 +02:00
return
2017-05-13 19:58:48 +02:00
case result = <-fetcher.results:
fetcher.gotResult(result)
case <-fetcher.wakeUp.C:
log.Infof("Got wake up signal...")
2017-05-13 19:58:48 +02:00
}
}
}
// RunAsync method
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) RunAsync() {
2017-05-13 21:07:39 +02:00
fetcher.wg.Add(1)
go fetcher.run()
}
// Close method
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) Close() error {
2017-05-13 19:58:48 +02:00
fetcher.queueMutex.Lock()
defer fetcher.queueMutex.Unlock()
// Send the done event to every Operation
for _, op := range fetcher.queue {
2017-06-02 12:03:23 +02:00
op.done <- struct{}{}
2017-05-13 19:58:48 +02:00
}
2017-06-02 12:03:23 +02:00
fetcher.done <- struct{}{}
fetcher.torrentClient.Close()
log.Infof("Send done signal to everyone, waiting...")
2017-05-13 21:07:39 +02:00
fetcher.wg.Wait()
2017-05-13 19:58:48 +02:00
return nil
}
// Wait method
2017-05-14 13:20:34 +02:00
func (fetcher *MetainfoFetcher) Wait() {
2017-05-13 21:07:39 +02:00
fetcher.wg.Wait()
}