2017-05-14 13:20:34 +02:00
package metainfoFetcher ;
2017-05-13 19:58:48 +02:00
import (
"github.com/anacrolix/torrent"
2017-05-14 04:12:18 +02:00
"github.com/anacrolix/torrent/metainfo"
2017-05-13 19:58:48 +02:00
"github.com/ewhal/nyaa/config"
2017-05-14 04:12:18 +02:00
"github.com/ewhal/nyaa/db"
2017-05-13 19:58:48 +02:00
"github.com/ewhal/nyaa/model"
"github.com/ewhal/nyaa/util/log"
serviceBase "github.com/ewhal/nyaa/service"
torrentService "github.com/ewhal/nyaa/service/torrent"
2017-05-14 16:35:03 +02:00
"golang.org/x/time/rate"
2017-05-15 00:30:56 +02:00
"math"
2017-05-13 19:58:48 +02:00
"sync"
"time"
)
2017-05-14 13:20:34 +02:00
type MetainfoFetcher struct {
2017-05-13 19:58:48 +02:00
torrentClient * torrent . Client
results chan Result
queueSize int
timeout int
maxDays int
2017-05-15 00:30:56 +02:00
baseFailCooldown int
maxFailCooldown int
2017-05-13 19:58:48 +02:00
done chan int
queue [ ] * FetchOperation
queueMutex sync . Mutex
2017-05-14 23:10:39 +02:00
failedOperations map [ uint ] time . Time
2017-05-15 00:30:56 +02:00
numFails map [ uint ] int
failsMutex sync . Mutex
2017-05-13 19:58:48 +02:00
wakeUp * time . Ticker
2017-05-13 21:07:39 +02:00
wg sync . WaitGroup
2017-05-13 19:58:48 +02:00
}
2017-05-14 13:20:34 +02:00
func New ( fetcherConfig * config . MetainfoFetcherConfig ) ( fetcher * MetainfoFetcher , err error ) {
2017-05-14 16:35:03 +02:00
clientConfig := torrent . Config { }
// Well, it seems this is the right way to convert speed -> rate.Limiter
// https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go
if fetcherConfig . UploadRateLimiter != - 1 {
clientConfig . UploadRateLimiter = rate . NewLimiter ( rate . Limit ( fetcherConfig . UploadRateLimiter * 1024 ) , 256 << 10 )
}
if fetcherConfig . DownloadRateLimiter != - 1 {
clientConfig . DownloadRateLimiter = rate . NewLimiter ( rate . Limit ( fetcherConfig . DownloadRateLimiter * 1024 ) , 1 << 20 )
}
client , err := torrent . NewClient ( & clientConfig )
2017-05-14 13:20:34 +02:00
fetcher = & MetainfoFetcher {
2017-05-13 19:58:48 +02:00
torrentClient : client ,
2017-05-14 00:35:35 +02:00
results : make ( chan Result , fetcherConfig . QueueSize ) ,
2017-05-13 19:58:48 +02:00
queueSize : fetcherConfig . QueueSize ,
timeout : fetcherConfig . Timeout ,
maxDays : fetcherConfig . MaxDays ,
2017-05-15 00:30:56 +02:00
baseFailCooldown : fetcherConfig . BaseFailCooldown ,
maxFailCooldown : fetcherConfig . MaxFailCooldown ,
2017-05-14 00:35:35 +02:00
done : make ( chan int , 1 ) ,
2017-05-14 23:10:39 +02:00
failedOperations : make ( map [ uint ] time . Time ) ,
2017-05-15 00:30:56 +02:00
numFails : make ( map [ uint ] int ) ,
2017-05-13 19:58:48 +02:00
wakeUp : time . NewTicker ( time . Second * time . Duration ( fetcherConfig . WakeUpInterval ) ) ,
}
return
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) isFetchingOrFailed ( t model . Torrent ) bool {
2017-05-13 19:58:48 +02:00
for _ , op := range fetcher . queue {
if op . torrent . ID == t . ID {
return true
}
}
_ , ok := fetcher . failedOperations [ t . ID ]
return ok
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) addToQueue ( op * FetchOperation ) bool {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
if len ( fetcher . queue ) + 1 > fetcher . queueSize {
return false
}
fetcher . queue = append ( fetcher . queue , op )
return true
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) removeFromQueue ( op * FetchOperation ) bool {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
for i , queueOP := range fetcher . queue {
if queueOP == op {
fetcher . queue = append ( fetcher . queue [ : i ] , fetcher . queue [ i + 1 : ] ... )
return true
}
}
return false
}
2017-05-15 00:30:56 +02:00
func ( fetcher * MetainfoFetcher ) markAsFailed ( tID uint ) {
fetcher . failsMutex . Lock ( )
defer fetcher . failsMutex . Unlock ( )
if n , ok := fetcher . numFails [ tID ] ; ok {
fetcher . numFails [ tID ] = n + 1
} else {
fetcher . numFails [ tID ] = 1
}
fetcher . failedOperations [ tID ] = time . Now ( )
}
func ( fetcher * MetainfoFetcher ) removeFromFailed ( tID uint ) {
fetcher . failsMutex . Lock ( )
defer fetcher . failsMutex . Unlock ( )
delete ( fetcher . failedOperations , tID )
}
2017-05-14 04:12:18 +02:00
func updateFileList ( dbEntry model . Torrent , info * metainfo . Info ) error {
2017-05-14 14:05:20 +02:00
torrentFiles := info . UpvertedFiles ( )
log . Infof ( "TID %d has %d files." , dbEntry . ID , len ( torrentFiles ) )
for _ , file := range torrentFiles {
2017-05-14 04:12:18 +02:00
path := file . DisplayPath ( info )
2017-05-14 16:35:03 +02:00
// Can't read FileList from the GetTorrents output, rely on the unique_index
// to ensure no files are duplicated.
log . Infof ( "Adding file %s to filelist of TID %d" , path , dbEntry . ID )
dbFile := model . File {
TorrentID : dbEntry . ID ,
Path : path ,
Filesize : file . Length ,
}
2017-05-14 04:12:18 +02:00
2017-05-14 16:35:03 +02:00
err := db . ORM . Create ( & dbFile ) . Error
if err != nil {
return err
2017-05-14 04:12:18 +02:00
}
}
return nil
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) gotResult ( r Result ) {
2017-05-13 19:58:48 +02:00
updatedSuccessfully := false
if r . err != nil {
2017-05-14 14:05:20 +02:00
log . Infof ( "Failed to get torrent metainfo (TID: %d), err %v" , r . operation . torrent . ID , r . err )
2017-05-14 04:12:18 +02:00
} else if r . info . TotalLength ( ) == 0 {
2017-05-13 19:58:48 +02:00
log . Infof ( "Got length 0 for torrent TID: %d. Possible bug?" , r . operation . torrent . ID )
} else {
2017-05-14 16:47:13 +02:00
lengthOK := true
if r . operation . torrent . Filesize != r . info . TotalLength ( ) {
log . Infof ( "Got length %d for torrent TID: %d. Updating." , r . info . TotalLength ( ) , r . operation . torrent . ID )
r . operation . torrent . Filesize = r . info . TotalLength ( )
_ , err := torrentService . UpdateTorrent ( r . operation . torrent )
if err != nil {
log . Infof ( "Failed to update torrent TID: %d with new filesize" , r . operation . torrent . ID )
lengthOK = false
}
2017-05-13 19:58:48 +02:00
}
2017-05-14 04:12:18 +02:00
2017-05-14 16:47:13 +02:00
filelistOK := true
2017-05-14 14:05:20 +02:00
// Create the file list, if it's missing.
if len ( r . operation . torrent . FileList ) == 0 {
2017-05-14 16:47:13 +02:00
err := updateFileList ( r . operation . torrent , r . info )
2017-05-14 14:05:20 +02:00
if err != nil {
log . Infof ( "Failed to update file list of TID %d" , r . operation . torrent . ID )
2017-05-14 16:47:13 +02:00
filelistOK = false
2017-05-14 14:05:20 +02:00
}
2017-05-14 04:12:18 +02:00
}
2017-05-14 16:47:13 +02:00
updatedSuccessfully = lengthOK && filelistOK
2017-05-13 19:58:48 +02:00
}
if ! updatedSuccessfully {
2017-05-15 00:30:56 +02:00
fetcher . markAsFailed ( r . operation . torrent . ID )
2017-05-13 19:58:48 +02:00
}
fetcher . removeFromQueue ( r . operation )
}
2017-05-14 23:10:39 +02:00
func ( fetcher * MetainfoFetcher ) removeOldFailures ( ) {
// Cooldown is disabled
2017-05-15 00:30:56 +02:00
if fetcher . baseFailCooldown < 0 {
2017-05-14 23:10:39 +02:00
return
}
2017-05-15 00:30:56 +02:00
maxCd := time . Duration ( fetcher . maxFailCooldown ) * time . Second
2017-05-14 23:10:39 +02:00
now := time . Now ( )
for id , failTime := range fetcher . failedOperations {
2017-05-15 00:30:56 +02:00
cdMult := int ( math . Pow ( 2 , float64 ( fetcher . numFails [ id ] - 1 ) ) )
cd := time . Duration ( cdMult * fetcher . baseFailCooldown ) * time . Second
if cd > maxCd {
cd = maxCd
}
if failTime . Add ( cd ) . Before ( now ) {
log . Infof ( "Torrent TID %d gone through cooldown, removing from failures" , id )
2017-05-14 23:10:39 +02:00
// Deleting keys inside a loop seems to be safe.
2017-05-15 00:30:56 +02:00
fetcher . removeFromFailed ( id )
2017-05-14 23:10:39 +02:00
}
}
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) fillQueue ( ) {
2017-05-13 19:58:48 +02:00
toFill := fetcher . queueSize - len ( fetcher . queue )
if toFill <= 0 {
return
}
oldest := time . Now ( ) . Add ( 0 - ( time . Hour * time . Duration ( 24 * fetcher . maxDays ) ) )
2017-05-14 21:47:48 +02:00
excludedIDS := make ( [ ] uint , 0 , len ( fetcher . failedOperations ) )
for id , _ := range fetcher . failedOperations {
excludedIDS = append ( excludedIDS , id )
}
2017-05-14 14:05:20 +02:00
// Nice query lol
// Select the torrents with no filesize, or without any rows with torrent_id in the files table, that are younger than fetcher.MaxDays
2017-05-14 21:47:48 +02:00
var params serviceBase . WhereParams
if len ( excludedIDS ) > 0 {
params = serviceBase . CreateWhereParams ( "((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ? AND torrent_id NOT IN (?)" , oldest , excludedIDS )
} else {
params = serviceBase . CreateWhereParams ( "((filesize IS NULL OR filesize = 0) OR (torrents.torrent_id NOT IN (SELECT files.torrent_id FROM files WHERE files.torrent_id = torrents.torrent_id))) AND date > ?" , oldest )
}
2017-05-14 23:24:41 +02:00
dbTorrents , err := torrentService . GetTorrentsOrderByNoCount ( & params , "" , fetcher . queueSize , 0 )
2017-05-13 19:58:48 +02:00
2017-05-14 23:24:41 +02:00
if len ( dbTorrents ) == 0 {
2017-05-14 23:10:39 +02:00
log . Infof ( "No torrents for filesize update" )
2017-05-13 19:58:48 +02:00
return
}
2017-05-14 23:10:39 +02:00
if err != nil {
log . Infof ( "Failed to get torrents for metainfo updating" )
2017-05-13 19:58:48 +02:00
return
}
2017-05-14 23:10:39 +02:00
2017-05-13 19:58:48 +02:00
for _ , T := range dbTorrents {
if fetcher . isFetchingOrFailed ( T ) {
continue
}
operation := NewFetchOperation ( fetcher , T )
if fetcher . addToQueue ( operation ) {
2017-05-15 02:21:04 +02:00
log . Infof ( "Added TID %d for filesize fetching" , T . ID )
2017-05-13 21:07:39 +02:00
fetcher . wg . Add ( 1 )
2017-05-13 19:58:48 +02:00
go operation . Start ( fetcher . results )
} else {
break
}
}
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) run ( ) {
2017-05-13 19:58:48 +02:00
var result Result
2017-05-13 21:07:39 +02:00
defer fetcher . wg . Done ( )
2017-05-13 19:58:48 +02:00
done := 0
fetcher . fillQueue ( )
for done == 0 {
2017-05-14 23:10:39 +02:00
fetcher . removeOldFailures ( )
fetcher . fillQueue ( )
2017-05-13 19:58:48 +02:00
select {
case done = <- fetcher . done :
break
case result = <- fetcher . results :
fetcher . gotResult ( result )
break
case <- fetcher . wakeUp . C :
break
}
}
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) RunAsync ( ) {
2017-05-13 21:07:39 +02:00
fetcher . wg . Add ( 1 )
go fetcher . run ( )
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) Close ( ) error {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
// Send the done event to every Operation
for _ , op := range fetcher . queue {
op . done <- 1
}
fetcher . done <- 1
2017-05-14 23:10:39 +02:00
fetcher . torrentClient . Close ( )
2017-05-14 16:35:03 +02:00
log . Infof ( "Send done signal to everyone, waiting..." )
2017-05-13 21:07:39 +02:00
fetcher . wg . Wait ( )
2017-05-13 19:58:48 +02:00
return nil
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) Wait ( ) {
2017-05-13 21:07:39 +02:00
fetcher . wg . Wait ( )
}