Add filesize fetcher mode
Cette révision appartient à :
Parent
1632c4b455
révision
a068984af3
6 fichiers modifiés avec 268 ajouts et 2 suppressions
|
@ -31,9 +31,11 @@ type Config struct {
|
||||||
Search SearchConfig `json:"search"`
|
Search SearchConfig `json:"search"`
|
||||||
// optional i2p configuration
|
// optional i2p configuration
|
||||||
I2P *I2PConfig `json:"i2p"`
|
I2P *I2PConfig `json:"i2p"`
|
||||||
|
// filesize fetcher config
|
||||||
|
FilesizeFetcher FilesizeFetcherConfig `json:"filesize_fetcher"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil}
|
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", "default", DefaultScraperConfig, DefaultCacheConfig, DefaultSearchConfig, nil, DefaultFilesizeFetcherConfig}
|
||||||
|
|
||||||
var allowedDatabaseTypes = map[string]bool{
|
var allowedDatabaseTypes = map[string]bool{
|
||||||
"sqlite3": true,
|
"sqlite3": true,
|
||||||
|
@ -57,6 +59,7 @@ func New() *Config {
|
||||||
config.DBLogMode = Defaults.DBLogMode
|
config.DBLogMode = Defaults.DBLogMode
|
||||||
config.Scrape = Defaults.Scrape
|
config.Scrape = Defaults.Scrape
|
||||||
config.Cache = Defaults.Cache
|
config.Cache = Defaults.Cache
|
||||||
|
config.FilesizeFetcher = Defaults.FilesizeFetcher
|
||||||
return &config
|
return &config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
16
config/filesizeFetcher.go
Fichier normal
16
config/filesizeFetcher.go
Fichier normal
|
@ -0,0 +1,16 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
type FilesizeFetcherConfig struct {
|
||||||
|
QueueSize int `json:"queue_size"`
|
||||||
|
Timeout int `json:"timeout"`
|
||||||
|
MaxDays int `json:"max_days"`
|
||||||
|
WakeUpInterval int `json:"wake_up_interval"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultFilesizeFetcherConfig = FilesizeFetcherConfig{
|
||||||
|
QueueSize: 10,
|
||||||
|
Timeout: 120, // 2 min
|
||||||
|
MaxDays: 90,
|
||||||
|
WakeUpInterval: 300, // 5 min
|
||||||
|
}
|
||||||
|
|
15
main.go
15
main.go
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/ewhal/nyaa/network"
|
"github.com/ewhal/nyaa/network"
|
||||||
"github.com/ewhal/nyaa/router"
|
"github.com/ewhal/nyaa/router"
|
||||||
"github.com/ewhal/nyaa/service/scraper"
|
"github.com/ewhal/nyaa/service/scraper"
|
||||||
|
"github.com/ewhal/nyaa/service/torrent/filesizeFetcher"
|
||||||
"github.com/ewhal/nyaa/util/log"
|
"github.com/ewhal/nyaa/util/log"
|
||||||
"github.com/ewhal/nyaa/util/search"
|
"github.com/ewhal/nyaa/util/search"
|
||||||
"github.com/ewhal/nyaa/util/signals"
|
"github.com/ewhal/nyaa/util/signals"
|
||||||
|
@ -94,6 +95,18 @@ func RunScraper(conf *config.Config) {
|
||||||
scraper.Wait()
|
scraper.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunFilesizeFetcher runs the database filesize fetcher main loop
|
||||||
|
func RunFilesizeFetcher(conf *config.Config) {
|
||||||
|
fetcher, err := filesizeFetcher.New(&conf.FilesizeFetcher)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to start fetcher, %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
signals.RegisterCloser(fetcher)
|
||||||
|
fetcher.Run()
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
conf := config.New()
|
conf := config.New()
|
||||||
processFlags := conf.BindFlags()
|
processFlags := conf.BindFlags()
|
||||||
|
@ -142,6 +155,8 @@ func main() {
|
||||||
RunScraper(conf)
|
RunScraper(conf)
|
||||||
} else if *mode == "webapp" {
|
} else if *mode == "webapp" {
|
||||||
RunServer(conf)
|
RunServer(conf)
|
||||||
|
} else if *mode == "filesize_fetcher" {
|
||||||
|
RunFilesizeFetcher(conf)
|
||||||
} else {
|
} else {
|
||||||
log.Fatalf("invalid runtime mode: %s", *mode)
|
log.Fatalf("invalid runtime mode: %s", *mode)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ type WhereParams struct {
|
||||||
Params []interface{}
|
Params []interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateWhereParams(conditions string, params ...string) WhereParams {
|
func CreateWhereParams(conditions string, params ...interface{}) WhereParams {
|
||||||
whereParams := WhereParams{
|
whereParams := WhereParams{
|
||||||
Conditions: conditions,
|
Conditions: conditions,
|
||||||
Params: make([]interface{}, len(params)),
|
Params: make([]interface{}, len(params)),
|
||||||
|
|
174
service/torrent/filesizeFetcher/filesizeFetcher.go
Fichier normal
174
service/torrent/filesizeFetcher/filesizeFetcher.go
Fichier normal
|
@ -0,0 +1,174 @@
|
||||||
|
package filesizeFetcher;
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/anacrolix/torrent"
|
||||||
|
"github.com/ewhal/nyaa/config"
|
||||||
|
"github.com/ewhal/nyaa/model"
|
||||||
|
"github.com/ewhal/nyaa/util/log"
|
||||||
|
serviceBase "github.com/ewhal/nyaa/service"
|
||||||
|
torrentService "github.com/ewhal/nyaa/service/torrent"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FilesizeFetcher struct {
|
||||||
|
torrentClient *torrent.Client
|
||||||
|
results chan Result
|
||||||
|
queueSize int
|
||||||
|
timeout int
|
||||||
|
maxDays int
|
||||||
|
done chan int
|
||||||
|
queue []*FetchOperation
|
||||||
|
queueMutex sync.Mutex
|
||||||
|
failedOperations map[uint]struct{}
|
||||||
|
wakeUp *time.Ticker
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(fetcherConfig *config.FilesizeFetcherConfig) (fetcher *FilesizeFetcher, err error) {
|
||||||
|
client, err := torrent.NewClient(nil)
|
||||||
|
fetcher = &FilesizeFetcher{
|
||||||
|
torrentClient: client,
|
||||||
|
results: make(chan Result),
|
||||||
|
queueSize: fetcherConfig.QueueSize,
|
||||||
|
timeout: fetcherConfig.Timeout,
|
||||||
|
maxDays: fetcherConfig.MaxDays,
|
||||||
|
done: make(chan int),
|
||||||
|
failedOperations: make(map[uint]struct{}),
|
||||||
|
wakeUp: time.NewTicker(time.Second * time.Duration(fetcherConfig.WakeUpInterval)),
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) isFetchingOrFailed(t model.Torrent) bool {
|
||||||
|
for _, op := range fetcher.queue {
|
||||||
|
if op.torrent.ID == t.ID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := fetcher.failedOperations[t.ID]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) addToQueue(op *FetchOperation) bool {
|
||||||
|
fetcher.queueMutex.Lock()
|
||||||
|
defer fetcher.queueMutex.Unlock()
|
||||||
|
|
||||||
|
if len(fetcher.queue) + 1 > fetcher.queueSize {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
fetcher.queue = append(fetcher.queue, op)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) removeFromQueue(op *FetchOperation) bool {
|
||||||
|
fetcher.queueMutex.Lock()
|
||||||
|
defer fetcher.queueMutex.Unlock()
|
||||||
|
|
||||||
|
for i, queueOP := range fetcher.queue {
|
||||||
|
if queueOP == op {
|
||||||
|
fetcher.queue = append(fetcher.queue[:i], fetcher.queue[i+1:]...)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) gotResult(r Result) {
|
||||||
|
updatedSuccessfully := false
|
||||||
|
if r.err != nil {
|
||||||
|
log.Infof("Failed to get torrent filesize (TID: %d), err ", r.operation.torrent.ID)
|
||||||
|
} else if r.info.Length == 0 {
|
||||||
|
log.Infof("Got length 0 for torrent TID: %d. Possible bug?", r.operation.torrent.ID)
|
||||||
|
} else {
|
||||||
|
log.Infof("Got length %d for torrent TID: %d. Updating.", r.info.Length, r.operation.torrent.ID)
|
||||||
|
r.operation.torrent.Filesize = r.info.Length
|
||||||
|
_, err := torrentService.UpdateTorrent(r.operation.torrent)
|
||||||
|
if err != nil {
|
||||||
|
log.Infof("Failed to update torrent TID: %d with new filesize", r.operation.torrent.ID)
|
||||||
|
} else {
|
||||||
|
updatedSuccessfully = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !updatedSuccessfully {
|
||||||
|
fetcher.failedOperations[r.operation.torrent.ID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
fetcher.removeFromQueue(r.operation)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) fillQueue() {
|
||||||
|
toFill := fetcher.queueSize - len(fetcher.queue)
|
||||||
|
|
||||||
|
if toFill <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
oldest := time.Now().Add(0 - (time.Hour * time.Duration(24 * fetcher.maxDays)))
|
||||||
|
params := serviceBase.CreateWhereParams("(filesize IS NULL OR filesize = 0) AND date > ?", oldest)
|
||||||
|
// Get up to queueSize + len(failed) torrents, so we get at least some fresh new ones.
|
||||||
|
dbTorrents, count, err := torrentService.GetTorrents(params, fetcher.queueSize + len(fetcher.failedOperations), 0)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Infof("Failed to get torrents for filesize updating")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if count == 0 {
|
||||||
|
log.Infof("No torrents for filesize update")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, T := range dbTorrents {
|
||||||
|
if fetcher.isFetchingOrFailed(T) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Added TID %d for filesize fetching", T.ID)
|
||||||
|
operation := NewFetchOperation(fetcher, T)
|
||||||
|
|
||||||
|
if fetcher.addToQueue(operation) {
|
||||||
|
go operation.Start(fetcher.results)
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) Run() {
|
||||||
|
var result Result
|
||||||
|
done := 0
|
||||||
|
fetcher.fillQueue()
|
||||||
|
for done == 0 {
|
||||||
|
select {
|
||||||
|
case done = <-fetcher.done:
|
||||||
|
break
|
||||||
|
case result = <-fetcher.results:
|
||||||
|
fetcher.gotResult(result)
|
||||||
|
fetcher.fillQueue()
|
||||||
|
break
|
||||||
|
case <-fetcher.wakeUp.C:
|
||||||
|
fetcher.fillQueue()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *FilesizeFetcher) Close() error {
|
||||||
|
fetcher.queueMutex.Lock()
|
||||||
|
defer fetcher.queueMutex.Unlock()
|
||||||
|
|
||||||
|
// Send the done event to every Operation
|
||||||
|
for _, op := range fetcher.queue {
|
||||||
|
op.done <- 1
|
||||||
|
}
|
||||||
|
|
||||||
|
fetcher.done <- 1
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
58
service/torrent/filesizeFetcher/operation.go
Fichier normal
58
service/torrent/filesizeFetcher/operation.go
Fichier normal
|
@ -0,0 +1,58 @@
|
||||||
|
package filesizeFetcher;
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
|
"github.com/ewhal/nyaa/config"
|
||||||
|
"github.com/ewhal/nyaa/model"
|
||||||
|
"github.com/ewhal/nyaa/util"
|
||||||
|
"errors"
|
||||||
|
"time"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FetchOperation struct {
|
||||||
|
fetcher *FilesizeFetcher
|
||||||
|
torrent model.Torrent
|
||||||
|
done chan int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Result struct {
|
||||||
|
operation *FetchOperation
|
||||||
|
err error
|
||||||
|
info *metainfo.Info
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFetchOperation(fetcher *FilesizeFetcher, dbEntry model.Torrent) (op *FetchOperation) {
|
||||||
|
op = &FetchOperation{
|
||||||
|
fetcher: fetcher,
|
||||||
|
torrent: dbEntry,
|
||||||
|
done: make(chan int),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be started from a goroutine somewhere
|
||||||
|
func (op *FetchOperation) Start(out chan Result) {
|
||||||
|
magnet := util.InfoHashToMagnet(strings.TrimSpace(op.torrent.Hash), op.torrent.Name, config.Trackers...)
|
||||||
|
downloadingTorrent, err := op.fetcher.torrentClient.AddMagnet(magnet)
|
||||||
|
if err != nil {
|
||||||
|
out <- Result{op, err, nil}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
timeoutTicker := time.NewTicker(time.Second * time.Duration(op.fetcher.timeout))
|
||||||
|
select {
|
||||||
|
case <-downloadingTorrent.GotInfo():
|
||||||
|
downloadingTorrent.Drop()
|
||||||
|
out <- Result{op, nil, downloadingTorrent.Info()}
|
||||||
|
return
|
||||||
|
case <-timeoutTicker.C:
|
||||||
|
downloadingTorrent.Drop()
|
||||||
|
out <- Result{op, errors.New("Timeout"), nil}
|
||||||
|
return
|
||||||
|
case <-op.done:
|
||||||
|
downloadingTorrent.Drop()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Référencer dans un nouveau ticket