Merge pull request #477 from ElegantMonkey/dev
MetainfoFetcher and file lists
Cette révision appartient à :
révision
a4a72a3346
|
@ -32,12 +32,12 @@ type Config struct {
|
|||
// optional i2p configuration
|
||||
I2P *I2PConfig `json:"i2p"`
|
||||
// filesize fetcher config
|
||||
FilesizeFetcher FilesizeFetcherConfig `json:"filesize_fetcher"`
|
||||
MetainfoFetcher MetainfoFetcherConfig `json:"metainfo_fetcher"`
|
||||
// internationalization config
|
||||
I18n I18nConfig `json:"i18n"`
|
||||
}
|
||||
|
||||
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil, DefaultFilesizeFetcherConfig, DefaultI18nConfig}
|
||||
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil, DefaultMetainfoFetcherConfig, DefaultI18nConfig}
|
||||
|
||||
var allowedDatabaseTypes = map[string]bool{
|
||||
"sqlite3": true,
|
||||
|
@ -61,7 +61,7 @@ func New() *Config {
|
|||
config.DBLogMode = Defaults.DBLogMode
|
||||
config.Scrape = Defaults.Scrape
|
||||
config.Cache = Defaults.Cache
|
||||
config.FilesizeFetcher = Defaults.FilesizeFetcher
|
||||
config.MetainfoFetcher = Defaults.MetainfoFetcher
|
||||
config.I18n = Defaults.I18n
|
||||
return &config
|
||||
}
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
package config
|
||||
|
||||
type FilesizeFetcherConfig struct {
|
||||
QueueSize int `json:"queue_size"`
|
||||
Timeout int `json:"timeout"`
|
||||
MaxDays int `json:"max_days"`
|
||||
WakeUpInterval int `json:"wake_up_interval"`
|
||||
}
|
||||
|
||||
var DefaultFilesizeFetcherConfig = FilesizeFetcherConfig{
|
||||
QueueSize: 10,
|
||||
Timeout: 120, // 2 min
|
||||
MaxDays: 90,
|
||||
WakeUpInterval: 300, // 5 min
|
||||
}
|
||||
|
26
config/metainfoFetcher.go
Fichier normal
26
config/metainfoFetcher.go
Fichier normal
|
@ -0,0 +1,26 @@
|
|||
package config
|
||||
|
||||
type MetainfoFetcherConfig struct {
|
||||
QueueSize int `json:"queue_size"`
|
||||
Timeout int `json:"timeout"`
|
||||
MaxDays int `json:"max_days"`
|
||||
BaseFailCooldown int `json:"base_fail_cooldown"`
|
||||
MaxFailCooldown int `json:"max_fail_cooldown"`
|
||||
WakeUpInterval int `json:"wake_up_interval"`
|
||||
|
||||
UploadRateLimiter int `json:"upload_rate_limiter"`
|
||||
DownloadRateLimiter int `json:"download_rate_limiter"`
|
||||
}
|
||||
|
||||
var DefaultMetainfoFetcherConfig = MetainfoFetcherConfig{
|
||||
QueueSize: 10,
|
||||
Timeout: 120, // 2 min
|
||||
MaxDays: 90,
|
||||
BaseFailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again.
|
||||
MaxFailCooldown: 48 * 60 * 60,
|
||||
WakeUpInterval: 300, // 5 min
|
||||
|
||||
UploadRateLimiter: 1024, // kbps
|
||||
DownloadRateLimiter: 1024,
|
||||
}
|
||||
|
|
@ -58,7 +58,11 @@ func GormInit(conf *config.Config, logger Logger) (*gorm.DB, error) {
|
|||
if db.Error != nil {
|
||||
return db, db.Error
|
||||
}
|
||||
db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{}, &model.File{})
|
||||
db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{})
|
||||
if db.Error != nil {
|
||||
return db, db.Error
|
||||
}
|
||||
db.AutoMigrate(&model.File{})
|
||||
if db.Error != nil {
|
||||
return db, db.Error
|
||||
}
|
||||
|
|
12
main.go
12
main.go
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/ewhal/nyaa/network"
|
||||
"github.com/ewhal/nyaa/router"
|
||||
"github.com/ewhal/nyaa/service/scraper"
|
||||
"github.com/ewhal/nyaa/service/torrent/filesizeFetcher"
|
||||
"github.com/ewhal/nyaa/service/torrent/metainfoFetcher"
|
||||
"github.com/ewhal/nyaa/service/user"
|
||||
"github.com/ewhal/nyaa/util/languages"
|
||||
"github.com/ewhal/nyaa/util/log"
|
||||
|
@ -84,9 +84,9 @@ func RunScraper(conf *config.Config) {
|
|||
scraper.Wait()
|
||||
}
|
||||
|
||||
// RunFilesizeFetcher runs the database filesize fetcher main loop
|
||||
func RunFilesizeFetcher(conf *config.Config) {
|
||||
fetcher, err := filesizeFetcher.New(&conf.FilesizeFetcher)
|
||||
// RunMetainfoFetcher runs the database filesize fetcher main loop
|
||||
func RunMetainfoFetcher(conf *config.Config) {
|
||||
fetcher, err := metainfoFetcher.New(&conf.MetainfoFetcher)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to start fetcher, %s", err)
|
||||
return
|
||||
|
@ -148,8 +148,8 @@ func main() {
|
|||
RunScraper(conf)
|
||||
} else if *mode == "webapp" {
|
||||
RunServer(conf)
|
||||
} else if *mode == "metadata_fetcher" {
|
||||
RunFilesizeFetcher(conf)
|
||||
} else if *mode == "metainfo_fetcher" {
|
||||
RunMetainfoFetcher(conf)
|
||||
} else {
|
||||
log.Fatalf("invalid runtime mode: %s", *mode)
|
||||
}
|
||||
|
|
|
@ -2,15 +2,13 @@ package model
|
|||
|
||||
type File struct {
|
||||
ID uint `gorm:"column:file_id;primary_key"`
|
||||
TorrentID uint `gorm:"column:torrent_id"`
|
||||
Path string `gorm:"column:path"`
|
||||
TorrentID uint `gorm:"column:torrent_id;unique_index:idx_tid_path"`
|
||||
Path string `gorm:"column:path;unique_index:idx_tid_path"`
|
||||
Filesize int64 `gorm:"column:filesize"`
|
||||
|
||||
Torrent *Torrent `gorm:"AssociationForeignKey:TorrentID;ForeignKey:torrent_id"`
|
||||
}
|
||||
|
||||
// Returns the total size of memory allocated for this struct
|
||||
func (f File) Size() int {
|
||||
return (1 + len(f.Path) + 2) * 8;
|
||||
return (2 + len(f.Path) + 2) * 8;
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ type CommentJSON struct {
|
|||
|
||||
type FileJSON struct {
|
||||
Path string `json:"path"`
|
||||
Length int64 `json:"length"`
|
||||
Filesize string `json:"filesize"`
|
||||
}
|
||||
|
||||
type TorrentJSON struct {
|
||||
|
@ -116,7 +116,7 @@ type TorrentJSON struct {
|
|||
Leechers uint32 `json:"leechers"`
|
||||
Completed uint32 `json:"completed"`
|
||||
LastScrape time.Time `json:"last_scrape"`
|
||||
FileList []File `json:"file_list"`
|
||||
FileList []FileJSON `json:"file_list"`
|
||||
}
|
||||
|
||||
// ToJSON converts a model.Torrent to its equivalent JSON structure
|
||||
|
@ -129,6 +129,10 @@ func (t *Torrent) ToJSON() TorrentJSON {
|
|||
for _, c := range t.Comments {
|
||||
commentsJSON = append(commentsJSON, CommentJSON{Username: c.User.Username, UserID: int(c.User.ID), Content: util.MarkdownToHTML(c.Content), Date: c.CreatedAt.UTC()})
|
||||
}
|
||||
fileListJSON := make([]FileJSON, 0, len(t.FileList))
|
||||
for _, f := range t.FileList {
|
||||
fileListJSON = append(fileListJSON, FileJSON{Path: f.Path, Filesize: util.FormatFilesize2(f.Filesize)})
|
||||
}
|
||||
uploader := ""
|
||||
if t.Uploader != nil {
|
||||
uploader = t.Uploader.Username
|
||||
|
@ -162,7 +166,7 @@ func (t *Torrent) ToJSON() TorrentJSON {
|
|||
Seeders: t.Seeders,
|
||||
Completed: t.Completed,
|
||||
LastScrape: t.LastScrape,
|
||||
FileList: t.FileList,
|
||||
FileList: fileListJSON,
|
||||
}
|
||||
|
||||
return res
|
||||
|
|
|
@ -24,6 +24,13 @@ import (
|
|||
"github.com/zeebo/bencode"
|
||||
)
|
||||
|
||||
// Use this, because we seem to avoid using models, and we would need
|
||||
// the torrent ID to create the File in the DB
|
||||
type UploadedFile struct {
|
||||
Path string
|
||||
Filesize int64
|
||||
}
|
||||
|
||||
// UploadForm serializing HTTP form for torrent upload
|
||||
type UploadForm struct {
|
||||
Name string
|
||||
|
@ -39,6 +46,7 @@ type UploadForm struct {
|
|||
SubCategoryID int
|
||||
Filesize int64
|
||||
Filepath string
|
||||
FileList []UploadedFile
|
||||
}
|
||||
|
||||
// TODO: these should be in another package (?)
|
||||
|
@ -150,6 +158,15 @@ func (f *UploadForm) ExtractInfo(r *http.Request) error {
|
|||
|
||||
// extract filesize
|
||||
f.Filesize = int64(torrent.TotalSize())
|
||||
|
||||
// extract filelist
|
||||
fileInfos := torrent.Info.GetFiles()
|
||||
for _, info := range fileInfos {
|
||||
f.FileList = append(f.FileList, UploadedFile{
|
||||
Path: info.Path.FilePath(),
|
||||
Filesize: int64(info.Length),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
// No torrent file provided
|
||||
magnetUrl, err := url.Parse(string(f.Magnet)) //?
|
||||
|
|
|
@ -60,6 +60,18 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Description: uploadForm.Description,
|
||||
UploaderID: user.ID}
|
||||
db.ORM.Create(&torrent)
|
||||
|
||||
// add filelist to files db, if we have one
|
||||
if len(uploadForm.FileList) > 0 {
|
||||
for _, uploadedFile := range uploadForm.FileList {
|
||||
file := model.File{
|
||||
TorrentID: torrent.ID,
|
||||
Path: uploadedFile.Path,
|
||||
Filesize: uploadedFile.Filesize}
|
||||
db.ORM.Create(&file)
|
||||
}
|
||||
}
|
||||
|
||||
url, err := Router.Get("view_torrent").URL("id", strconv.FormatUint(uint64(torrent.ID), 10))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
|
|
@ -1,228 +0,0 @@
|
|||
package filesizeFetcher;
|
||||
|
||||
import (
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/ewhal/nyaa/config"
|
||||
"github.com/ewhal/nyaa/db"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
"github.com/ewhal/nyaa/util/log"
|
||||
serviceBase "github.com/ewhal/nyaa/service"
|
||||
torrentService "github.com/ewhal/nyaa/service/torrent"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FilesizeFetcher struct {
|
||||
torrentClient *torrent.Client
|
||||
results chan Result
|
||||
queueSize int
|
||||
timeout int
|
||||
maxDays int
|
||||
done chan int
|
||||
queue []*FetchOperation
|
||||
queueMutex sync.Mutex
|
||||
failedOperations map[uint]struct{}
|
||||
wakeUp *time.Ticker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(fetcherConfig *config.FilesizeFetcherConfig) (fetcher *FilesizeFetcher, err error) {
|
||||
client, err := torrent.NewClient(nil)
|
||||
fetcher = &FilesizeFetcher{
|
||||
torrentClient: client,
|
||||
results: make(chan Result, fetcherConfig.QueueSize),
|
||||
queueSize: fetcherConfig.QueueSize,
|
||||
timeout: fetcherConfig.Timeout,
|
||||
maxDays: fetcherConfig.MaxDays,
|
||||
done: make(chan int, 1),
|
||||
failedOperations: make(map[uint]struct{}),
|
||||
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) isFetchingOrFailed(t model.Torrent) bool {
|
||||
for _, op := range fetcher.queue {
|
||||
if op.torrent.ID == t.ID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
_, ok := fetcher.failedOperations[t.ID]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) addToQueue(op *FetchOperation) bool {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
if len(fetcher.queue) + 1 > fetcher.queueSize {
|
||||
return false
|
||||
}
|
||||
|
||||
fetcher.queue = append(fetcher.queue, op)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
func (fetcher *FilesizeFetcher) removeFromQueue(op *FetchOperation) bool {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
for i, queueOP := range fetcher.queue {
|
||||
if queueOP == op {
|
||||
fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error {
|
||||
log.Infof("TID %d has %d files.", dbEntry.ID, len(info.Files))
|
||||
for _, file := range info.Files {
|
||||
path := file.DisplayPath(info)
|
||||
fileExists := false
|
||||
for _, existingFile := range dbEntry.FileList {
|
||||
if existingFile.Path == path {
|
||||
fileExists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !fileExists {
|
||||
log.Infof("Adding file %s to filelist of TID %d", path, dbEntry.ID)
|
||||
dbFile := model.File{
|
||||
TorrentID: dbEntry.ID,
|
||||
Path: path,
|
||||
Filesize: file.Length,
|
||||
}
|
||||
|
||||
err := db.ORM.Create(&dbFile).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) gotResult(r Result) {
|
||||
updatedSuccessfully := false
|
||||
if r.err != nil {
|
||||
log.Infof("Failed to get torrent filesize (TID: %d), err %v", r.operation.torrent.ID, r.err)
|
||||
} else if r.info.TotalLength() == 0 {
|
||||
log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID)
|
||||
} else {
|
||||
log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.TotalLength(), r.operation.torrent.ID)
|
||||
r.operation.torrent.Filesize = r.info.TotalLength()
|
||||
_, err := torrentService.UpdateTorrent(r.operation.torrent)
|
||||
if err != nil {
|
||||
log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID)
|
||||
} else {
|
||||
updatedSuccessfully = true
|
||||
}
|
||||
|
||||
// Also update the File list with FileInfo, I guess.
|
||||
err = updateFileList(r.operation.torrent, r.info)
|
||||
if err != nil {
|
||||
log.Infof("Failed to update file list of TID %d", r.operation.torrent.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if !updatedSuccessfully {
|
||||
fetcher.failedOperations[r.operation.torrent.ID] = struct{}{}
|
||||
}
|
||||
|
||||
fetcher.removeFromQueue(r.operation)
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) fillQueue() {
|
||||
toFill := fetcher.queueSize - len(fetcher.queue)
|
||||
|
||||
if toFill <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
oldest := time.Now().Add(0 - (time.Hour * time.Duration(24 * fetcher.maxDays)))
|
||||
params := serviceBase.CreateWhereParams("(filesize IS NULL OR filesize = 0) AND date > ?", oldest)
|
||||
// Get up to queueSize + len(failed) torrents, so we get at least some fresh new ones.
|
||||
dbTorrents, count, err := torrentService.GetTorrents(params, fetcher.queueSize + len(fetcher.failedOperations), 0)
|
||||
|
||||
if err != nil {
|
||||
log.Infof("Failed to get torrents for filesize updating")
|
||||
return
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
log.Infof("No torrents for filesize update")
|
||||
return
|
||||
}
|
||||
|
||||
for _, T := range dbTorrents {
|
||||
if fetcher.isFetchingOrFailed(T) {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("Added TID %d for filesize fetching", T.ID)
|
||||
operation := NewFetchOperation(fetcher, T)
|
||||
|
||||
if fetcher.addToQueue(operation) {
|
||||
fetcher.wg.Add(1)
|
||||
go operation.Start(fetcher.results)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) run() {
|
||||
var result Result
|
||||
|
||||
defer fetcher.wg.Done()
|
||||
|
||||
done := 0
|
||||
fetcher.fillQueue()
|
||||
for done == 0 {
|
||||
select {
|
||||
case done = <-fetcher.done:
|
||||
break
|
||||
case result = <-fetcher.results:
|
||||
fetcher.gotResult(result)
|
||||
fetcher.fillQueue()
|
||||
break
|
||||
case <-fetcher.wakeUp.C:
|
||||
fetcher.fillQueue()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) RunAsync() {
|
||||
fetcher.wg.Add(1)
|
||||
|
||||
go fetcher.run()
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) Close() error {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
// Send the done event to every Operation
|
||||
for _, op := range fetcher.queue {
|
||||
op.done <- 1
|
||||
}
|
||||
|
||||
fetcher.done <- 1
|
||||
fetcher.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) Wait() {
|
||||
fetcher.wg.Wait()
|
||||
}
|
||||
|
308
service/torrent/metainfoFetcher/metainfoFetcher.go
Fichier normal
308
service/torrent/metainfoFetcher/metainfoFetcher.go
Fichier normal
|
@ -0,0 +1,308 @@
|
|||
package metainfoFetcher;
|
||||
|
||||
import (
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/ewhal/nyaa/config"
|
||||
"github.com/ewhal/nyaa/db"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
"github.com/ewhal/nyaa/util/log"
|
||||
serviceBase "github.com/ewhal/nyaa/service"
|
||||
torrentService "github.com/ewhal/nyaa/service/torrent"
|
||||
"golang.org/x/time/rate"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MetainfoFetcher struct {
|
||||
torrentClient *torrent.Client
|
||||
results chan Result
|
||||
queueSize int
|
||||
timeout int
|
||||
maxDays int
|
||||
baseFailCooldown int
|
||||
maxFailCooldown int
|
||||
done chan int
|
||||
queue []*FetchOperation
|
||||
queueMutex sync.Mutex
|
||||
failedOperations map[uint]time.Time
|
||||
numFails map[uint]int
|
||||
failsMutex sync.Mutex
|
||||
wakeUp *time.Ticker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher, err error) {
|
||||
clientConfig := torrent.Config{}
|
||||
// Well, it seems this is the right way to convert speed -> rate.Limiter
|
||||
// https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go
|
||||
if fetcherConfig.UploadRateLimiter != -1 {
|
||||
clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.UploadRateLimiter * 1024), 256<<10)
|
||||
}
|
||||
if fetcherConfig.DownloadRateLimiter != -1 {
|
||||
clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.DownloadRateLimiter * 1024), 1<<20)
|
||||
}
|
||||
|
||||
client, err := torrent.NewClient(&clientConfig)
|
||||
|
||||
fetcher = &MetainfoFetcher{
|
||||
torrentClient: client,
|
||||
results: make(chan Result, fetcherConfig.QueueSize),
|
||||
queueSize: fetcherConfig.QueueSize,
|
||||
timeout: fetcherConfig.Timeout,
|
||||
maxDays: fetcherConfig.MaxDays,
|
||||
baseFailCooldown: fetcherConfig.BaseFailCooldown,
|
||||
maxFailCooldown: fetcherConfig.MaxFailCooldown,
|
||||
done: make(chan int, 1),
|
||||
failedOperations: make(map[uint]time.Time),
|
||||
numFails: make(map[uint]int),
|
||||
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) isFetchingOrFailed(t model.Torrent) bool {
|
||||
for _, op := range fetcher.queue {
|
||||
if op.torrent.ID == t.ID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
_, ok := fetcher.failedOperations[t.ID]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) addToQueue(op *FetchOperation) bool {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
if len(fetcher.queue) + 1 > fetcher.queueSize {
|
||||
return false
|
||||
}
|
||||
|
||||
fetcher.queue = append(fetcher.queue, op)
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
func (fetcher *MetainfoFetcher) removeFromQueue(op *FetchOperation) bool {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
for i, queueOP := range fetcher.queue {
|
||||
if queueOP == op {
|
||||
fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) markAsFailed(tID uint) {
|
||||
fetcher.failsMutex.Lock()
|
||||
defer fetcher.failsMutex.Unlock()
|
||||
|
||||
if n, ok := fetcher.numFails[tID]; ok {
|
||||
fetcher.numFails[tID] = n + 1
|
||||
} else {
|
||||
fetcher.numFails[tID] = 1
|
||||
}
|
||||
|
||||
fetcher.failedOperations[tID] = time.Now()
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) removeFromFailed(tID uint) {
|
||||
fetcher.failsMutex.Lock()
|
||||
defer fetcher.failsMutex.Unlock()
|
||||
|
||||
delete(fetcher.failedOperations, tID)
|
||||
}
|
||||
|
||||
func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error {
|
||||
torrentFiles := info.UpvertedFiles()
|
||||
log.Infof("TID %d has %d files.", dbEntry.ID, len(torrentFiles))
|
||||
for _, file := range torrentFiles {
|
||||
path := file.DisplayPath(info)
|
||||
|
||||
// Can't read FileList from the GetTorrents output, rely on the unique_index
|
||||
// to ensure no files are duplicated.
|
||||
log.Infof("Adding file %s to filelist of TID %d", path, dbEntry.ID)
|
||||
dbFile := model.File{
|
||||
TorrentID: dbEntry.ID,
|
||||
Path: path,
|
||||
Filesize: file.Length,
|
||||
}
|
||||
|
||||
err := db.ORM.Create(&dbFile).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) gotResult(r Result) {
|
||||
updatedSuccessfully := false
|
||||
if r.err != nil {
|
||||
log.Infof("Failed to get torrent metainfo (TID: %d), err %v", r.operation.torrent.ID, r.err)
|
||||
} else if r.info.TotalLength() == 0 {
|
||||
log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID)
|
||||
} else {
|
||||
lengthOK := true
|
||||
|
||||
if r.operation.torrent.Filesize != r.info.TotalLength() {
|
||||
log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.TotalLength(), r.operation.torrent.ID)
|
||||
r.operation.torrent.Filesize = r.info.TotalLength()
|
||||
_, err := torrentService.UpdateTorrent(r.operation.torrent)
|
||||
if err != nil {
|
||||
log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID)
|
||||
lengthOK = false
|
||||
}
|
||||
}
|
||||
|
||||
filelistOK := true
|
||||
// Create the file list, if it's missing.
|
||||
if len(r.operation.torrent.FileList) == 0 {
|
||||
err := updateFileList(r.operation.torrent, r.info)
|
||||
if err != nil {
|
||||
log.Infof("Failed to update file list of TID %d", r.operation.torrent.ID)
|
||||
filelistOK = false
|
||||
}
|
||||
}
|
||||
|
||||
updatedSuccessfully = lengthOK && filelistOK
|
||||
}
|
||||
|
||||
if !updatedSuccessfully {
|
||||
fetcher.markAsFailed(r.operation.torrent.ID)
|
||||
}
|
||||
|
||||
fetcher.removeFromQueue(r.operation)
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) removeOldFailures() {
|
||||
// Cooldown is disabled
|
||||
if fetcher.baseFailCooldown < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
maxCd := time.Duration(fetcher.maxFailCooldown) * time.Second
|
||||
now := time.Now()
|
||||
for id, failTime := range fetcher.failedOperations {
|
||||
cdMult := int(math.Pow(2, float64(fetcher.numFails[id] - 1)))
|
||||
cd := time.Duration(cdMult * fetcher.baseFailCooldown) * time.Second
|
||||
if cd > maxCd {
|
||||
cd = maxCd
|
||||
}
|
||||
|
||||
if failTime.Add(cd).Before(now) {
|
||||
log.Infof("Torrent TID %d gone through cooldown, removing from failures", id)
|
||||
// Deleting keys inside a loop seems to be safe.
|
||||
fetcher.removeFromFailed(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) fillQueue() {
|
||||
toFill := fetcher.queueSize - len(fetcher.queue)
|
||||
|
||||
if toFill <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
oldest := time.Now().Add(0 - (time.Hour * time.Duration(24 * fetcher.maxDays)))
|
||||
excludedIDS := make([]uint, 0, len(fetcher.failedOperations))
|
||||
for id, _ := range fetcher.failedOperations {
|
||||
excludedIDS = append(excludedIDS, id)
|
||||
}
|
||||
|
||||
// Nice query lol
|
||||
// Select the torrents with no filesize, or without any rows with torrent_id in the files table, that are younger than fetcher.MaxDays
|
||||
var params serviceBase.WhereParams
|
||||
|
||||
if len(excludedIDS) > 0 {
|
||||
params = serviceBase.CreateWhereParams("((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ? AND torrent_id NOT IN (?)", oldest, excludedIDS)
|
||||
} else {
|
||||
params = serviceBase.CreateWhereParams("((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ?", oldest)
|
||||
}
|
||||
dbTorrents, err := torrentService.GetTorrentsOrderByNoCount(¶ms, "", fetcher.queueSize, 0)
|
||||
|
||||
if len(dbTorrents) == 0 {
|
||||
log.Infof("No torrents for filesize update")
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Infof("Failed to get torrents for metainfo updating")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
for _, T := range dbTorrents {
|
||||
if fetcher.isFetchingOrFailed(T) {
|
||||
continue
|
||||
}
|
||||
|
||||
operation := NewFetchOperation(fetcher, T)
|
||||
if fetcher.addToQueue(operation) {
|
||||
log.Infof("Added TID %d for filesize fetching", T.ID)
|
||||
fetcher.wg.Add(1)
|
||||
go operation.Start(fetcher.results)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) run() {
|
||||
var result Result
|
||||
|
||||
defer fetcher.wg.Done()
|
||||
|
||||
done := 0
|
||||
fetcher.fillQueue()
|
||||
for done == 0 {
|
||||
fetcher.removeOldFailures()
|
||||
fetcher.fillQueue()
|
||||
select {
|
||||
case done = <-fetcher.done:
|
||||
break
|
||||
case result = <-fetcher.results:
|
||||
fetcher.gotResult(result)
|
||||
break
|
||||
case <-fetcher.wakeUp.C:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) RunAsync() {
|
||||
fetcher.wg.Add(1)
|
||||
|
||||
go fetcher.run()
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) Close() error {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
||||
// Send the done event to every Operation
|
||||
for _, op := range fetcher.queue {
|
||||
op.done <- 1
|
||||
}
|
||||
|
||||
fetcher.done <- 1
|
||||
fetcher.torrentClient.Close()
|
||||
log.Infof("Send done signal to everyone, waiting...")
|
||||
fetcher.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fetcher *MetainfoFetcher) Wait() {
|
||||
fetcher.wg.Wait()
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package filesizeFetcher;
|
||||
package metainfoFetcher;
|
||||
|
||||
import (
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
type FetchOperation struct {
|
||||
fetcher *FilesizeFetcher
|
||||
fetcher *MetainfoFetcher
|
||||
torrent model.Torrent
|
||||
done chan int
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ type Result struct {
|
|||
info *metainfo.Info
|
||||
}
|
||||
|
||||
func NewFetchOperation(fetcher *FilesizeFetcher, dbEntry model.Torrent) (op *FetchOperation) {
|
||||
func NewFetchOperation(fetcher *MetainfoFetcher, dbEntry model.Torrent) (op *FetchOperation) {
|
||||
op = &FetchOperation{
|
||||
fetcher: fetcher,
|
||||
torrent: dbEntry,
|
||||
|
@ -41,20 +41,20 @@ func (op *FetchOperation) Start(out chan Result) {
|
|||
out <- Result{op, err, nil}
|
||||
return
|
||||
}
|
||||
defer downloadingTorrent.Drop()
|
||||
|
||||
timeoutTicker := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout))
|
||||
timeoutTimer := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout))
|
||||
defer timeoutTimer.Stop()
|
||||
select {
|
||||
case <-downloadingTorrent.GotInfo():
|
||||
downloadingTorrent.Drop()
|
||||
out <- Result{op, nil, downloadingTorrent.Info()}
|
||||
break
|
||||
case <-timeoutTicker.C:
|
||||
downloadingTorrent.Drop()
|
||||
case <-timeoutTimer.C:
|
||||
out <- Result{op, errors.New("Timeout"), nil}
|
||||
break
|
||||
case <-op.done:
|
||||
downloadingTorrent.Drop()
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package filesizeFetcher;
|
||||
package metainfoFetcher;
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
@ -13,7 +13,7 @@ func TestInvalidHash(t *testing.T) {
|
|||
t.Skipf("Failed to create client, with err %v. Skipping.", err)
|
||||
}
|
||||
|
||||
fetcher := &FilesizeFetcher{
|
||||
fetcher := &MetainfoFetcher{
|
||||
timeout: 5,
|
||||
torrentClient: client,
|
||||
results: make(chan Result, 1),
|
|
@ -53,7 +53,7 @@ func GetTorrentById(id string) (torrent model.Torrent, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
tmp := db.ORM.Where("torrent_id = ?", id).Preload("Comments")
|
||||
tmp := db.ORM.Where("torrent_id = ?", id).Preload("Comments").Preload("FileList")
|
||||
err = tmp.Error
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
@ -82,6 +82,27 @@
|
|||
<hr>
|
||||
</div>
|
||||
</div>
|
||||
{{ if gt (len .FileList) 0 }}
|
||||
<div class="row" id="files">
|
||||
<div class="col-md-12">
|
||||
<h4>{{T "files"}}</h4>
|
||||
<div class="table-responsive">
|
||||
<table class="table custom-table-hover">
|
||||
<tr>
|
||||
<th class="col-xs-8">{{T "filename"}}</th>
|
||||
<th class="col-xs-1">{{T "size"}}</th>
|
||||
</tr>
|
||||
{{ range .FileList }}
|
||||
<tr>
|
||||
<td>{{.Path}}</td>
|
||||
<td>{{.Filesize}}</td>
|
||||
</tr>
|
||||
{{ end }}
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{{ end }}
|
||||
<div class="row" id="comments">
|
||||
<div class="col-md-12">
|
||||
<div class="commentList">
|
||||
|
|
|
@ -647,6 +647,14 @@
|
|||
"id": "delete",
|
||||
"translation": "Delete"
|
||||
},
|
||||
{
|
||||
"id": "files",
|
||||
"translation": "Files"
|
||||
},
|
||||
{
|
||||
"id": "filename",
|
||||
"translation": "Filename"
|
||||
},
|
||||
{
|
||||
"id": "uploaded_by",
|
||||
"translation": "Uploaded by"
|
||||
|
|
Référencer dans un nouveau ticket