diff --git a/config/config.go b/config/config.go index 5f805724..a297ce45 100644 --- a/config/config.go +++ b/config/config.go @@ -32,12 +32,12 @@ type Config struct { // optional i2p configuration I2P *I2PConfig `json:"i2p"` // filesize fetcher config - FilesizeFetcher FilesizeFetcherConfig `json:"filesize_fetcher"` + MetainfoFetcher MetainfoFetcherConfig `json:"metainfo_fetcher"` // internationalization config I18n I18nConfig `json:"i18n"` } -var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil, DefaultFilesizeFetcherConfig, DefaultI18nConfig} +var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil, DefaultMetainfoFetcherConfig, DefaultI18nConfig} var allowedDatabaseTypes = map[string]bool{ "sqlite3": true, @@ -61,7 +61,7 @@ func New() *Config { config.DBLogMode = Defaults.DBLogMode config.Scrape = Defaults.Scrape config.Cache = Defaults.Cache - config.FilesizeFetcher = Defaults.FilesizeFetcher + config.MetainfoFetcher = Defaults.MetainfoFetcher config.I18n = Defaults.I18n return &config } diff --git a/config/filesizeFetcher.go b/config/filesizeFetcher.go deleted file mode 100644 index c86082fb..00000000 --- a/config/filesizeFetcher.go +++ /dev/null @@ -1,16 +0,0 @@ -package config - -type FilesizeFetcherConfig struct { - QueueSize int `json:"queue_size"` - Timeout int `json:"timeout"` - MaxDays int `json:"max_days"` - WakeUpInterval int `json:"wake_up_interval"` -} - -var DefaultFilesizeFetcherConfig = FilesizeFetcherConfig{ - QueueSize: 10, - Timeout: 120, // 2 min - MaxDays: 90, - WakeUpInterval: 300, // 5 min -} - diff --git a/config/metainfoFetcher.go b/config/metainfoFetcher.go new file mode 100644 index 00000000..a65f2859 --- /dev/null +++ b/config/metainfoFetcher.go @@ -0,0 +1,26 @@ +package config + +type MetainfoFetcherConfig struct { + QueueSize int `json:"queue_size"` + Timeout int `json:"timeout"` + MaxDays int `json:"max_days"` + BaseFailCooldown int `json:"base_fail_cooldown"` + MaxFailCooldown int `json:"max_fail_cooldown"` + WakeUpInterval int `json:"wake_up_interval"` + + UploadRateLimiter int `json:"upload_rate_limiter"` + DownloadRateLimiter int `json:"download_rate_limiter"` +} + +var DefaultMetainfoFetcherConfig = MetainfoFetcherConfig{ + QueueSize: 10, + Timeout: 120, // 2 min + MaxDays: 90, + BaseFailCooldown: 30 * 60, // in seconds, when failed torrents will be able to be fetched again. + MaxFailCooldown: 48 * 60 * 60, + WakeUpInterval: 300, // 5 min + + UploadRateLimiter: 1024, // kbps + DownloadRateLimiter: 1024, +} + diff --git a/db/gorm.go b/db/gorm.go index 88c7db2c..74c42a07 100644 --- a/db/gorm.go +++ b/db/gorm.go @@ -58,7 +58,11 @@ func GormInit(conf *config.Config, logger Logger) (*gorm.DB, error) { if db.Error != nil { return db, db.Error } - db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{}, &model.File{}) + db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{}) + if db.Error != nil { + return db, db.Error + } + db.AutoMigrate(&model.File{}) if db.Error != nil { return db, db.Error } diff --git a/main.go b/main.go index bb3bd4d4..a24c1f53 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ import ( "github.com/ewhal/nyaa/network" "github.com/ewhal/nyaa/router" "github.com/ewhal/nyaa/service/scraper" - "github.com/ewhal/nyaa/service/torrent/filesizeFetcher" + "github.com/ewhal/nyaa/service/torrent/metainfoFetcher" "github.com/ewhal/nyaa/service/user" "github.com/ewhal/nyaa/util/languages" "github.com/ewhal/nyaa/util/log" @@ -84,9 +84,9 @@ func RunScraper(conf *config.Config) { scraper.Wait() } -// RunFilesizeFetcher runs the database filesize fetcher main loop -func RunFilesizeFetcher(conf *config.Config) { - fetcher, err := filesizeFetcher.New(&conf.FilesizeFetcher) +// RunMetainfoFetcher runs the database filesize fetcher main loop +func RunMetainfoFetcher(conf *config.Config) { + fetcher, err := metainfoFetcher.New(&conf.MetainfoFetcher) if err != nil { log.Fatalf("failed to start fetcher, %s", err) return @@ -148,8 +148,8 @@ func main() { RunScraper(conf) } else if *mode == "webapp" { RunServer(conf) - } else if *mode == "metadata_fetcher" { - RunFilesizeFetcher(conf) + } else if *mode == "metainfo_fetcher" { + RunMetainfoFetcher(conf) } else { log.Fatalf("invalid runtime mode: %s", *mode) } diff --git a/model/file.go b/model/file.go index 4a95c6dc..67b64168 100644 --- a/model/file.go +++ b/model/file.go @@ -2,15 +2,13 @@ package model type File struct { ID uint `gorm:"column:file_id;primary_key"` - TorrentID uint `gorm:"column:torrent_id"` - Path string `gorm:"column:path"` - Filesize int64 `gorm:"column:filesize"` - - Torrent *Torrent `gorm:"AssociationForeignKey:TorrentID;ForeignKey:torrent_id"` + TorrentID uint `gorm:"column:torrent_id;unique_index:idx_tid_path"` + Path string `gorm:"column:path;unique_index:idx_tid_path"` + Filesize int64 `gorm:"column:filesize"` } // Returns the total size of memory allocated for this struct func (f File) Size() int { - return (1 + len(f.Path) + 2) * 8; + return (2 + len(f.Path) + 2) * 8; } diff --git a/model/torrent.go b/model/torrent.go index 50e83031..de081167 100644 --- a/model/torrent.go +++ b/model/torrent.go @@ -90,8 +90,8 @@ type CommentJSON struct { } type FileJSON struct { - Path string `json:"path"` - Length int64 `json:"length"` + Path string `json:"path"` + Filesize string `json:"filesize"` } type TorrentJSON struct { @@ -116,7 +116,7 @@ type TorrentJSON struct { Leechers uint32 `json:"leechers"` Completed uint32 `json:"completed"` LastScrape time.Time `json:"last_scrape"` - FileList []File `json:"file_list"` + FileList []FileJSON `json:"file_list"` } // ToJSON converts a model.Torrent to its equivalent JSON structure @@ -129,6 +129,10 @@ func (t *Torrent) ToJSON() TorrentJSON { for _, c := range t.Comments { commentsJSON = append(commentsJSON, CommentJSON{Username: c.User.Username, UserID: int(c.User.ID), Content: util.MarkdownToHTML(c.Content), Date: c.CreatedAt.UTC()}) } + fileListJSON := make([]FileJSON, 0, len(t.FileList)) + for _, f := range t.FileList { + fileListJSON = append(fileListJSON, FileJSON{Path: f.Path, Filesize: util.FormatFilesize2(f.Filesize)}) + } uploader := "" if t.Uploader != nil { uploader = t.Uploader.Username @@ -162,7 +166,7 @@ func (t *Torrent) ToJSON() TorrentJSON { Seeders: t.Seeders, Completed: t.Completed, LastScrape: t.LastScrape, - FileList: t.FileList, + FileList: fileListJSON, } return res diff --git a/router/upload.go b/router/upload.go index 2b143da9..57a65759 100644 --- a/router/upload.go +++ b/router/upload.go @@ -24,6 +24,13 @@ import ( "github.com/zeebo/bencode" ) +// Use this, because we seem to avoid using models, and we would need +// the torrent ID to create the File in the DB +type UploadedFile struct { + Path string + Filesize int64 +} + // UploadForm serializing HTTP form for torrent upload type UploadForm struct { Name string @@ -39,6 +46,7 @@ type UploadForm struct { SubCategoryID int Filesize int64 Filepath string + FileList []UploadedFile } // TODO: these should be in another package (?) @@ -150,6 +158,15 @@ func (f *UploadForm) ExtractInfo(r *http.Request) error { // extract filesize f.Filesize = int64(torrent.TotalSize()) + + // extract filelist + fileInfos := torrent.Info.GetFiles() + for _, info := range fileInfos { + f.FileList = append(f.FileList, UploadedFile{ + Path: info.Path.FilePath(), + Filesize: int64(info.Length), + }) + } } else { // No torrent file provided magnetUrl, err := url.Parse(string(f.Magnet)) //? diff --git a/router/uploadHandler.go b/router/uploadHandler.go index 5afcd3fe..ea07c62c 100644 --- a/router/uploadHandler.go +++ b/router/uploadHandler.go @@ -60,6 +60,18 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) { Description: uploadForm.Description, UploaderID: user.ID} db.ORM.Create(&torrent) + + // add filelist to files db, if we have one + if len(uploadForm.FileList) > 0 { + for _, uploadedFile := range uploadForm.FileList { + file := model.File{ + TorrentID: torrent.ID, + Path: uploadedFile.Path, + Filesize: uploadedFile.Filesize} + db.ORM.Create(&file) + } + } + url, err := Router.Get("view_torrent").URL("id", strconv.FormatUint(uint64(torrent.ID), 10)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/service/torrent/filesizeFetcher/filesizeFetcher.go b/service/torrent/filesizeFetcher/filesizeFetcher.go deleted file mode 100644 index 1f56f0c3..00000000 --- a/service/torrent/filesizeFetcher/filesizeFetcher.go +++ /dev/null @@ -1,228 +0,0 @@ -package filesizeFetcher; - -import ( - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/metainfo" - "github.com/ewhal/nyaa/config" - "github.com/ewhal/nyaa/db" - "github.com/ewhal/nyaa/model" - "github.com/ewhal/nyaa/util/log" - serviceBase "github.com/ewhal/nyaa/service" - torrentService "github.com/ewhal/nyaa/service/torrent" - "sync" - "time" -) - -type FilesizeFetcher struct { - torrentClient *torrent.Client - results chan Result - queueSize int - timeout int - maxDays int - done chan int - queue []*FetchOperation - queueMutex sync.Mutex - failedOperations map[uint]struct{} - wakeUp *time.Ticker - wg sync.WaitGroup -} - -func New(fetcherConfig *config.FilesizeFetcherConfig) (fetcher *FilesizeFetcher, err error) { - client, err := torrent.NewClient(nil) - fetcher = &FilesizeFetcher{ - torrentClient: client, - results: make(chan Result, fetcherConfig.QueueSize), - queueSize: fetcherConfig.QueueSize, - timeout: fetcherConfig.Timeout, - maxDays: fetcherConfig.MaxDays, - done: make(chan int, 1), - failedOperations: make(map[uint]struct{}), - wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)), - } - - return -} - -func (fetcher *FilesizeFetcher) isFetchingOrFailed(t model.Torrent) bool { - for _, op := range fetcher.queue { - if op.torrent.ID == t.ID { - return true - } - } - - _, ok := fetcher.failedOperations[t.ID] - return ok -} - -func (fetcher *FilesizeFetcher) addToQueue(op *FetchOperation) bool { - fetcher.queueMutex.Lock() - defer fetcher.queueMutex.Unlock() - - if len(fetcher.queue) + 1 > fetcher.queueSize { - return false - } - - fetcher.queue = append(fetcher.queue, op) - return true -} - - -func (fetcher *FilesizeFetcher) removeFromQueue(op *FetchOperation) bool { - fetcher.queueMutex.Lock() - defer fetcher.queueMutex.Unlock() - - for i, queueOP := range fetcher.queue { - if queueOP == op { - fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...) - return true - } - } - - return false -} - -func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error { - log.Infof("TID %d has %d files.", dbEntry.ID, len(info.Files)) - for _, file := range info.Files { - path := file.DisplayPath(info) - fileExists := false - for _, existingFile := range dbEntry.FileList { - if existingFile.Path == path { - fileExists = true - break - } - } - - if !fileExists { - log.Infof("Adding file %s to filelist of TID %d", path, dbEntry.ID) - dbFile := model.File{ - TorrentID: dbEntry.ID, - Path: path, - Filesize: file.Length, - } - - err := db.ORM.Create(&dbFile).Error - if err != nil { - return err - } - } - } - - return nil -} - -func (fetcher *FilesizeFetcher) gotResult(r Result) { - updatedSuccessfully := false - if r.err != nil { - log.Infof("Failed to get torrent filesize (TID: %d), err %v", r.operation.torrent.ID, r.err) - } else if r.info.TotalLength() == 0 { - log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID) - } else { - log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.TotalLength(), r.operation.torrent.ID) - r.operation.torrent.Filesize = r.info.TotalLength() - _, err := torrentService.UpdateTorrent(r.operation.torrent) - if err != nil { - log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID) - } else { - updatedSuccessfully = true - } - - // Also update the File list with FileInfo, I guess. - err = updateFileList(r.operation.torrent, r.info) - if err != nil { - log.Infof("Failed to update file list of TID %d", r.operation.torrent.ID) - } - } - - if !updatedSuccessfully { - fetcher.failedOperations[r.operation.torrent.ID] = struct{}{} - } - - fetcher.removeFromQueue(r.operation) -} - -func (fetcher *FilesizeFetcher) fillQueue() { - toFill := fetcher.queueSize - len(fetcher.queue) - - if toFill <= 0 { - return - } - - oldest := time.Now().Add(0 - (time.Hour * time.Duration(24 * fetcher.maxDays))) - params := serviceBase.CreateWhereParams("(filesize IS NULL OR filesize = 0) AND date > ?", oldest) - // Get up to queueSize + len(failed) torrents, so we get at least some fresh new ones. - dbTorrents, count, err := torrentService.GetTorrents(params, fetcher.queueSize + len(fetcher.failedOperations), 0) - - if err != nil { - log.Infof("Failed to get torrents for filesize updating") - return - } - - if count == 0 { - log.Infof("No torrents for filesize update") - return - } - - for _, T := range dbTorrents { - if fetcher.isFetchingOrFailed(T) { - continue - } - - log.Infof("Added TID %d for filesize fetching", T.ID) - operation := NewFetchOperation(fetcher, T) - - if fetcher.addToQueue(operation) { - fetcher.wg.Add(1) - go operation.Start(fetcher.results) - } else { - break - } - } -} - -func (fetcher *FilesizeFetcher) run() { - var result Result - - defer fetcher.wg.Done() - - done := 0 - fetcher.fillQueue() - for done == 0 { - select { - case done = <-fetcher.done: - break - case result = <-fetcher.results: - fetcher.gotResult(result) - fetcher.fillQueue() - break - case <-fetcher.wakeUp.C: - fetcher.fillQueue() - break - } - } -} - -func (fetcher *FilesizeFetcher) RunAsync() { - fetcher.wg.Add(1) - - go fetcher.run() -} - -func (fetcher *FilesizeFetcher) Close() error { - fetcher.queueMutex.Lock() - defer fetcher.queueMutex.Unlock() - - // Send the done event to every Operation - for _, op := range fetcher.queue { - op.done <- 1 - } - - fetcher.done <- 1 - fetcher.wg.Wait() - return nil -} - -func (fetcher *FilesizeFetcher) Wait() { - fetcher.wg.Wait() -} - diff --git a/service/torrent/metainfoFetcher/metainfoFetcher.go b/service/torrent/metainfoFetcher/metainfoFetcher.go new file mode 100644 index 00000000..8da54513 --- /dev/null +++ b/service/torrent/metainfoFetcher/metainfoFetcher.go @@ -0,0 +1,308 @@ +package metainfoFetcher; + +import ( + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" + "github.com/ewhal/nyaa/config" + "github.com/ewhal/nyaa/db" + "github.com/ewhal/nyaa/model" + "github.com/ewhal/nyaa/util/log" + serviceBase "github.com/ewhal/nyaa/service" + torrentService "github.com/ewhal/nyaa/service/torrent" + "golang.org/x/time/rate" + "math" + "sync" + "time" +) + +type MetainfoFetcher struct { + torrentClient *torrent.Client + results chan Result + queueSize int + timeout int + maxDays int + baseFailCooldown int + maxFailCooldown int + done chan int + queue []*FetchOperation + queueMutex sync.Mutex + failedOperations map[uint]time.Time + numFails map[uint]int + failsMutex sync.Mutex + wakeUp *time.Ticker + wg sync.WaitGroup +} + +func New(fetcherConfig *config.MetainfoFetcherConfig) (fetcher *MetainfoFetcher, err error) { + clientConfig := torrent.Config{} + // Well, it seems this is the right way to convert speed -> rate.Limiter + // https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go + if fetcherConfig.UploadRateLimiter != -1 { + clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.UploadRateLimiter * 1024), 256<<10) + } + if fetcherConfig.DownloadRateLimiter != -1 { + clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(fetcherConfig.DownloadRateLimiter * 1024), 1<<20) + } + + client, err := torrent.NewClient(&clientConfig) + + fetcher = &MetainfoFetcher{ + torrentClient: client, + results: make(chan Result, fetcherConfig.QueueSize), + queueSize: fetcherConfig.QueueSize, + timeout: fetcherConfig.Timeout, + maxDays: fetcherConfig.MaxDays, + baseFailCooldown: fetcherConfig.BaseFailCooldown, + maxFailCooldown: fetcherConfig.MaxFailCooldown, + done: make(chan int, 1), + failedOperations: make(map[uint]time.Time), + numFails: make(map[uint]int), + wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)), + } + + return +} + +func (fetcher *MetainfoFetcher) isFetchingOrFailed(t model.Torrent) bool { + for _, op := range fetcher.queue { + if op.torrent.ID == t.ID { + return true + } + } + + _, ok := fetcher.failedOperations[t.ID] + return ok +} + +func (fetcher *MetainfoFetcher) addToQueue(op *FetchOperation) bool { + fetcher.queueMutex.Lock() + defer fetcher.queueMutex.Unlock() + + if len(fetcher.queue) + 1 > fetcher.queueSize { + return false + } + + fetcher.queue = append(fetcher.queue, op) + return true +} + + +func (fetcher *MetainfoFetcher) removeFromQueue(op *FetchOperation) bool { + fetcher.queueMutex.Lock() + defer fetcher.queueMutex.Unlock() + + for i, queueOP := range fetcher.queue { + if queueOP == op { + fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...) + return true + } + } + + return false +} + +func (fetcher *MetainfoFetcher) markAsFailed(tID uint) { + fetcher.failsMutex.Lock() + defer fetcher.failsMutex.Unlock() + + if n, ok := fetcher.numFails[tID]; ok { + fetcher.numFails[tID] = n + 1 + } else { + fetcher.numFails[tID] = 1 + } + + fetcher.failedOperations[tID] = time.Now() +} + +func (fetcher *MetainfoFetcher) removeFromFailed(tID uint) { + fetcher.failsMutex.Lock() + defer fetcher.failsMutex.Unlock() + + delete(fetcher.failedOperations, tID) +} + +func updateFileList(dbEntry model.Torrent, info *metainfo.Info) error { + torrentFiles := info.UpvertedFiles() + log.Infof("TID %d has %d files.", dbEntry.ID, len(torrentFiles)) + for _, file := range torrentFiles { + path := file.DisplayPath(info) + + // Can't read FileList from the GetTorrents output, rely on the unique_index + // to ensure no files are duplicated. + log.Infof("Adding file %s to filelist of TID %d", path, dbEntry.ID) + dbFile := model.File{ + TorrentID: dbEntry.ID, + Path: path, + Filesize: file.Length, + } + + err := db.ORM.Create(&dbFile).Error + if err != nil { + return err + } + } + + return nil +} + +func (fetcher *MetainfoFetcher) gotResult(r Result) { + updatedSuccessfully := false + if r.err != nil { + log.Infof("Failed to get torrent metainfo (TID: %d), err %v", r.operation.torrent.ID, r.err) + } else if r.info.TotalLength() == 0 { + log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID) + } else { + lengthOK := true + + if r.operation.torrent.Filesize != r.info.TotalLength() { + log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.TotalLength(), r.operation.torrent.ID) + r.operation.torrent.Filesize = r.info.TotalLength() + _, err := torrentService.UpdateTorrent(r.operation.torrent) + if err != nil { + log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID) + lengthOK = false + } + } + + filelistOK := true + // Create the file list, if it's missing. + if len(r.operation.torrent.FileList) == 0 { + err := updateFileList(r.operation.torrent, r.info) + if err != nil { + log.Infof("Failed to update file list of TID %d", r.operation.torrent.ID) + filelistOK = false + } + } + + updatedSuccessfully = lengthOK && filelistOK + } + + if !updatedSuccessfully { + fetcher.markAsFailed(r.operation.torrent.ID) + } + + fetcher.removeFromQueue(r.operation) +} + +func (fetcher *MetainfoFetcher) removeOldFailures() { + // Cooldown is disabled + if fetcher.baseFailCooldown < 0 { + return + } + + maxCd := time.Duration(fetcher.maxFailCooldown) * time.Second + now := time.Now() + for id, failTime := range fetcher.failedOperations { + cdMult := int(math.Pow(2, float64(fetcher.numFails[id] - 1))) + cd := time.Duration(cdMult * fetcher.baseFailCooldown) * time.Second + if cd > maxCd { + cd = maxCd + } + + if failTime.Add(cd).Before(now) { + log.Infof("Torrent TID %d gone through cooldown, removing from failures", id) + // Deleting keys inside a loop seems to be safe. + fetcher.removeFromFailed(id) + } + } +} + +func (fetcher *MetainfoFetcher) fillQueue() { + toFill := fetcher.queueSize - len(fetcher.queue) + + if toFill <= 0 { + return + } + + oldest := time.Now().Add(0 - (time.Hour * time.Duration(24 * fetcher.maxDays))) + excludedIDS := make([]uint, 0, len(fetcher.failedOperations)) + for id, _ := range fetcher.failedOperations { + excludedIDS = append(excludedIDS, id) + } + + // Nice query lol + // Select the torrents with no filesize, or without any rows with torrent_id in the files table, that are younger than fetcher.MaxDays + var params serviceBase.WhereParams + + if len(excludedIDS) > 0 { + params = serviceBase.CreateWhereParams("((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ? AND torrent_id NOT IN (?)", oldest, excludedIDS) + } else { + params = serviceBase.CreateWhereParams("((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ?", oldest) + } + dbTorrents, err := torrentService.GetTorrentsOrderByNoCount(¶ms, "", fetcher.queueSize, 0) + + if len(dbTorrents) == 0 { + log.Infof("No torrents for filesize update") + return + } + + if err != nil { + log.Infof("Failed to get torrents for metainfo updating") + return + } + + + for _, T := range dbTorrents { + if fetcher.isFetchingOrFailed(T) { + continue + } + + operation := NewFetchOperation(fetcher, T) + if fetcher.addToQueue(operation) { + log.Infof("Added TID %d for filesize fetching", T.ID) + fetcher.wg.Add(1) + go operation.Start(fetcher.results) + } else { + break + } + } +} + +func (fetcher *MetainfoFetcher) run() { + var result Result + + defer fetcher.wg.Done() + + done := 0 + fetcher.fillQueue() + for done == 0 { + fetcher.removeOldFailures() + fetcher.fillQueue() + select { + case done = <-fetcher.done: + break + case result = <-fetcher.results: + fetcher.gotResult(result) + break + case <-fetcher.wakeUp.C: + break + } + } +} + +func (fetcher *MetainfoFetcher) RunAsync() { + fetcher.wg.Add(1) + + go fetcher.run() +} + +func (fetcher *MetainfoFetcher) Close() error { + fetcher.queueMutex.Lock() + defer fetcher.queueMutex.Unlock() + + // Send the done event to every Operation + for _, op := range fetcher.queue { + op.done <- 1 + } + + fetcher.done <- 1 + fetcher.torrentClient.Close() + log.Infof("Send done signal to everyone, waiting...") + fetcher.wg.Wait() + return nil +} + +func (fetcher *MetainfoFetcher) Wait() { + fetcher.wg.Wait() +} + diff --git a/service/torrent/filesizeFetcher/operation.go b/service/torrent/metainfoFetcher/operation.go similarity index 77% rename from service/torrent/filesizeFetcher/operation.go rename to service/torrent/metainfoFetcher/operation.go index 1a08d061..92eada2e 100644 --- a/service/torrent/filesizeFetcher/operation.go +++ b/service/torrent/metainfoFetcher/operation.go @@ -1,4 +1,4 @@ -package filesizeFetcher; +package metainfoFetcher; import ( "github.com/anacrolix/torrent/metainfo" @@ -11,7 +11,7 @@ import ( ) type FetchOperation struct { - fetcher *FilesizeFetcher + fetcher *MetainfoFetcher torrent model.Torrent done chan int } @@ -22,7 +22,7 @@ type Result struct { info *metainfo.Info } -func NewFetchOperation(fetcher *FilesizeFetcher, dbEntry model.Torrent) (op *FetchOperation) { +func NewFetchOperation(fetcher *MetainfoFetcher, dbEntry model.Torrent) (op *FetchOperation) { op = &FetchOperation{ fetcher: fetcher, torrent: dbEntry, @@ -41,20 +41,20 @@ func (op *FetchOperation) Start(out chan Result) { out <- Result{op, err, nil} return } + defer downloadingTorrent.Drop() - timeoutTicker := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout)) + timeoutTimer := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout)) + defer timeoutTimer.Stop() select { case <-downloadingTorrent.GotInfo(): - downloadingTorrent.Drop() out <- Result{op, nil, downloadingTorrent.Info()} break - case <-timeoutTicker.C: - downloadingTorrent.Drop() + case <-timeoutTimer.C: out <- Result{op, errors.New("Timeout"), nil} break case <-op.done: - downloadingTorrent.Drop() break } } + diff --git a/service/torrent/filesizeFetcher/operation_test.go b/service/torrent/metainfoFetcher/operation_test.go similarity index 93% rename from service/torrent/filesizeFetcher/operation_test.go rename to service/torrent/metainfoFetcher/operation_test.go index 21e36faa..1be73474 100644 --- a/service/torrent/filesizeFetcher/operation_test.go +++ b/service/torrent/metainfoFetcher/operation_test.go @@ -1,4 +1,4 @@ -package filesizeFetcher; +package metainfoFetcher; import ( "testing" @@ -13,7 +13,7 @@ func TestInvalidHash(t *testing.T) { t.Skipf("Failed to create client, with err %v. Skipping.", err) } - fetcher := &FilesizeFetcher{ + fetcher := &MetainfoFetcher{ timeout: 5, torrentClient: client, results: make(chan Result, 1), diff --git a/service/torrent/torrent.go b/service/torrent/torrent.go index d6d7cb0a..ae909a1b 100644 --- a/service/torrent/torrent.go +++ b/service/torrent/torrent.go @@ -53,7 +53,7 @@ func GetTorrentById(id string) (torrent model.Torrent, err error) { return } - tmp := db.ORM.Where("torrent_id = ?", id).Preload("Comments") + tmp := db.ORM.Where("torrent_id = ?", id).Preload("Comments").Preload("FileList") err = tmp.Error if err != nil { return diff --git a/templates/view.html b/templates/view.html index 2dc8999c..8ece64eb 100644 --- a/templates/view.html +++ b/templates/view.html @@ -82,6 +82,27 @@
+ {{ if gt (len .FileList) 0 }} +
+
+

{{T "files"}}

+
+ + + + + + {{ range .FileList }} + + + + + {{ end }} +
{{T "filename"}}{{T "size"}}
{{.Path}}{{.Filesize}}
+
+
+
+ {{ end }}
diff --git a/translations/en-us.all.json b/translations/en-us.all.json index dea88612..37faa78c 100644 --- a/translations/en-us.all.json +++ b/translations/en-us.all.json @@ -647,6 +647,14 @@ "id": "delete", "translation": "Delete" }, + { + "id": "files", + "translation": "Files" + }, + { + "id": "filename", + "translation": "Filename" + }, { "id": "uploaded_by", "translation": "Uploaded by"