2017-05-15 11:32:28 +02:00
package metainfoFetcher
2017-05-13 19:58:48 +02:00
import (
2017-05-26 12:12:52 +02:00
"sync"
"time"
2017-05-17 07:58:40 +02:00
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/db"
"github.com/NyaaPantsu/nyaa/model"
serviceBase "github.com/NyaaPantsu/nyaa/service"
torrentService "github.com/NyaaPantsu/nyaa/service/torrent"
"github.com/NyaaPantsu/nyaa/util/log"
2017-05-13 19:58:48 +02:00
"github.com/anacrolix/torrent"
2017-05-14 04:12:18 +02:00
"github.com/anacrolix/torrent/metainfo"
2017-05-14 16:35:03 +02:00
"golang.org/x/time/rate"
2017-05-13 19:58:48 +02:00
)
2017-05-26 12:12:52 +02:00
// MetainfoFetcher Struct
2017-05-14 13:20:34 +02:00
type MetainfoFetcher struct {
2017-05-13 19:58:48 +02:00
torrentClient * torrent . Client
results chan Result
queueSize int
timeout int
maxDays int
2017-05-15 00:30:56 +02:00
baseFailCooldown int
maxFailCooldown int
2017-05-23 03:19:28 +02:00
newTorrentsOnly bool
2017-06-02 12:03:23 +02:00
done chan struct { }
2017-05-13 19:58:48 +02:00
queue [ ] * FetchOperation
queueMutex sync . Mutex
2017-05-14 23:10:39 +02:00
failedOperations map [ uint ] time . Time
2017-05-15 00:30:56 +02:00
numFails map [ uint ] int
failsMutex sync . Mutex
2017-05-13 19:58:48 +02:00
wakeUp * time . Ticker
2017-05-13 21:07:39 +02:00
wg sync . WaitGroup
2017-05-13 19:58:48 +02:00
}
2017-05-26 12:12:52 +02:00
// New : Creates a MetainfoFetcher struct
2017-06-02 12:03:23 +02:00
func New ( fetcherConfig * config . MetainfoFetcherConfig ) ( * MetainfoFetcher , error ) {
2017-05-14 16:35:03 +02:00
clientConfig := torrent . Config { }
2017-06-02 12:03:23 +02:00
2017-05-14 16:35:03 +02:00
// Well, it seems this is the right way to convert speed -> rate.Limiter
// https://github.com/anacrolix/torrent/blob/master/cmd/torrent/main.go
2017-06-02 12:03:23 +02:00
const uploadBurst = 0x40000 // 256K
const downloadBurst = 0x100000 // 1M
uploadLimit := fetcherConfig . UploadRateLimitKiB * 1024
downloadLimit := fetcherConfig . DownloadRateLimitKiB * 1024
if uploadLimit > 0 {
limit := rate . Limit ( uploadLimit )
limiter := rate . NewLimiter ( limit , uploadBurst )
clientConfig . UploadRateLimiter = limiter
2017-05-14 16:35:03 +02:00
}
2017-06-02 12:03:23 +02:00
if downloadLimit > 0 {
limit := rate . Limit ( downloadLimit )
limiter := rate . NewLimiter ( limit , downloadBurst )
clientConfig . DownloadRateLimiter = limiter
2017-05-14 16:35:03 +02:00
}
client , err := torrent . NewClient ( & clientConfig )
2017-06-02 12:03:23 +02:00
if err != nil {
return nil , err
}
2017-05-14 16:35:03 +02:00
2017-06-02 12:03:23 +02:00
fetcher := & MetainfoFetcher {
2017-05-13 19:58:48 +02:00
torrentClient : client ,
2017-05-14 00:35:35 +02:00
results : make ( chan Result , fetcherConfig . QueueSize ) ,
2017-05-13 19:58:48 +02:00
queueSize : fetcherConfig . QueueSize ,
timeout : fetcherConfig . Timeout ,
maxDays : fetcherConfig . MaxDays ,
2017-05-23 03:19:28 +02:00
newTorrentsOnly : fetcherConfig . FetchNewTorrentsOnly ,
2017-05-15 00:30:56 +02:00
baseFailCooldown : fetcherConfig . BaseFailCooldown ,
maxFailCooldown : fetcherConfig . MaxFailCooldown ,
2017-06-02 12:03:23 +02:00
done : make ( chan struct { } , 1 ) ,
2017-05-14 23:10:39 +02:00
failedOperations : make ( map [ uint ] time . Time ) ,
2017-05-15 00:30:56 +02:00
numFails : make ( map [ uint ] int ) ,
2017-05-13 19:58:48 +02:00
wakeUp : time . NewTicker ( time . Second * time . Duration ( fetcherConfig . WakeUpInterval ) ) ,
}
2017-06-02 12:03:23 +02:00
return fetcher , nil
2017-05-13 19:58:48 +02:00
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) addToQueue ( op * FetchOperation ) bool {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
2017-06-02 14:13:35 +02:00
if fetcher . queueSize > 0 && len ( fetcher . queue ) > fetcher . queueSize - 1 {
2017-05-13 19:58:48 +02:00
return false
}
fetcher . queue = append ( fetcher . queue , op )
return true
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) removeFromQueue ( op * FetchOperation ) bool {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
for i , queueOP := range fetcher . queue {
if queueOP == op {
fetcher . queue = append ( fetcher . queue [ : i ] , fetcher . queue [ i + 1 : ] ... )
return true
}
}
return false
}
2017-05-15 00:30:56 +02:00
func ( fetcher * MetainfoFetcher ) markAsFailed ( tID uint ) {
fetcher . failsMutex . Lock ( )
defer fetcher . failsMutex . Unlock ( )
if n , ok := fetcher . numFails [ tID ] ; ok {
fetcher . numFails [ tID ] = n + 1
} else {
fetcher . numFails [ tID ] = 1
}
fetcher . failedOperations [ tID ] = time . Now ( )
}
func ( fetcher * MetainfoFetcher ) removeFromFailed ( tID uint ) {
fetcher . failsMutex . Lock ( )
defer fetcher . failsMutex . Unlock ( )
delete ( fetcher . failedOperations , tID )
}
2017-05-14 04:12:18 +02:00
func updateFileList ( dbEntry model . Torrent , info * metainfo . Info ) error {
2017-05-14 14:05:20 +02:00
torrentFiles := info . UpvertedFiles ( )
log . Infof ( "TID %d has %d files." , dbEntry . ID , len ( torrentFiles ) )
2017-05-15 23:45:47 +02:00
2017-05-14 14:05:20 +02:00
for _ , file := range torrentFiles {
2017-05-15 23:45:47 +02:00
var path [ ] string
if file . Path != nil {
path = file . Path
} else {
// If it's nil, use the torrent name (info.Name) as the path (single-file torrent)
path = append ( path , info . Name )
}
2017-05-14 04:12:18 +02:00
2017-05-14 16:35:03 +02:00
// Can't read FileList from the GetTorrents output, rely on the unique_index
// to ensure no files are duplicated.
2017-05-15 23:45:47 +02:00
log . Infof ( "Adding file %s to filelist of TID %d" , file . DisplayPath ( info ) , dbEntry . ID )
2017-05-14 16:35:03 +02:00
dbFile := model . File {
TorrentID : dbEntry . ID ,
2017-05-15 11:32:28 +02:00
Filesize : file . Length ,
2017-05-14 16:35:03 +02:00
}
2017-05-15 23:45:47 +02:00
err := dbFile . SetPath ( path )
if err != nil {
return err
}
2017-05-14 04:12:18 +02:00
2017-05-15 23:45:47 +02:00
err = db . ORM . Create ( & dbFile ) . Error
2017-05-14 16:35:03 +02:00
if err != nil {
return err
2017-05-14 04:12:18 +02:00
}
}
return nil
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) gotResult ( r Result ) {
2017-05-13 19:58:48 +02:00
updatedSuccessfully := false
if r . err != nil {
2017-05-14 14:05:20 +02:00
log . Infof ( "Failed to get torrent metainfo (TID: %d), err %v" , r . operation . torrent . ID , r . err )
2017-05-14 04:12:18 +02:00
} else if r . info . TotalLength ( ) == 0 {
2017-05-13 19:58:48 +02:00
log . Infof ( "Got length 0 for torrent TID: %d. Possible bug?" , r . operation . torrent . ID )
} else {
2017-05-14 16:47:13 +02:00
lengthOK := true
if r . operation . torrent . Filesize != r . info . TotalLength ( ) {
log . Infof ( "Got length %d for torrent TID: %d. Updating." , r . info . TotalLength ( ) , r . operation . torrent . ID )
r . operation . torrent . Filesize = r . info . TotalLength ( )
_ , err := torrentService . UpdateTorrent ( r . operation . torrent )
if err != nil {
log . Infof ( "Failed to update torrent TID: %d with new filesize" , r . operation . torrent . ID )
lengthOK = false
}
2017-05-13 19:58:48 +02:00
}
2017-05-14 04:12:18 +02:00
2017-05-14 16:47:13 +02:00
filelistOK := true
2017-05-14 14:05:20 +02:00
// Create the file list, if it's missing.
if len ( r . operation . torrent . FileList ) == 0 {
2017-05-14 16:47:13 +02:00
err := updateFileList ( r . operation . torrent , r . info )
2017-05-14 14:05:20 +02:00
if err != nil {
log . Infof ( "Failed to update file list of TID %d" , r . operation . torrent . ID )
2017-05-14 16:47:13 +02:00
filelistOK = false
2017-05-14 14:05:20 +02:00
}
2017-05-14 04:12:18 +02:00
}
2017-05-14 16:47:13 +02:00
updatedSuccessfully = lengthOK && filelistOK
2017-05-13 19:58:48 +02:00
}
if ! updatedSuccessfully {
2017-05-15 00:30:56 +02:00
fetcher . markAsFailed ( r . operation . torrent . ID )
2017-05-13 19:58:48 +02:00
}
fetcher . removeFromQueue ( r . operation )
}
2017-05-14 23:10:39 +02:00
func ( fetcher * MetainfoFetcher ) removeOldFailures ( ) {
2017-05-15 00:30:56 +02:00
if fetcher . baseFailCooldown < 0 {
2017-06-02 16:22:31 +02:00
// XXX: Cooldown is disabled.
// this means that if any attempt to fetch metadata fails
// it will never be retried. it also means that
// fetcher.failedOperations will keep accumulating torrent IDs
// that are never freed.
2017-05-14 23:10:39 +02:00
return
}
2017-06-02 16:22:31 +02:00
max := time . Duration ( fetcher . maxFailCooldown ) * time . Second
2017-05-14 23:10:39 +02:00
now := time . Now ( )
for id , failTime := range fetcher . failedOperations {
2017-06-02 16:22:31 +02:00
// double the amount of time waited for ever failed attempt.
// | nfailed | cooldown
// | 1 | 2 * base
// | 2 | 4 * base
// | 3 | 8 * base
// integers inside fetcher.numFails are never less than or equal to zero
mul := 1 << uint ( fetcher . numFails [ id ] - 1 )
cd := time . Duration ( mul * fetcher . baseFailCooldown ) * time . Second
if cd > max {
cd = max
2017-05-15 00:30:56 +02:00
}
if failTime . Add ( cd ) . Before ( now ) {
log . Infof ( "Torrent TID %d gone through cooldown, removing from failures" , id )
2017-05-14 23:10:39 +02:00
// Deleting keys inside a loop seems to be safe.
2017-05-15 00:30:56 +02:00
fetcher . removeFromFailed ( id )
2017-05-14 23:10:39 +02:00
}
}
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) fillQueue ( ) {
2017-06-02 16:22:31 +02:00
if left := fetcher . queueSize - len ( fetcher . queue ) ; left <= 0 {
// queue is already full.
2017-05-13 19:58:48 +02:00
return
}
2017-05-15 11:32:28 +02:00
oldest := time . Now ( ) . Add ( 0 - ( time . Hour * time . Duration ( 24 * fetcher . maxDays ) ) )
2017-05-14 21:47:48 +02:00
excludedIDS := make ( [ ] uint , 0 , len ( fetcher . failedOperations ) )
2017-05-24 09:15:07 +02:00
for id := range fetcher . failedOperations {
2017-05-14 21:47:48 +02:00
excludedIDS = append ( excludedIDS , id )
}
2017-05-24 09:11:13 +02:00
2017-05-31 04:21:57 +02:00
tFiles := config . Conf . Models . FilesTableName
tTorrents := config . Conf . Models . TorrentsTableName
2017-05-23 03:19:28 +02:00
// Select the torrents with no filesize, or without any rows with torrent_id in the files table...
2017-05-24 09:11:13 +02:00
queryString := "((filesize IS NULL OR filesize = 0) OR (" + tTorrents + ".torrent_id NOT " +
"IN (SELECT " + tFiles + ".torrent_id FROM " + tFiles + " WHERE " + tFiles +
".torrent_id = " + tTorrents + ".torrent_id)))"
2017-05-23 03:19:28 +02:00
var whereParamsArgs [ ] interface { }
// that are newer than maxDays...
queryString += " AND date > ? "
whereParamsArgs = append ( whereParamsArgs , oldest )
// that didn't fail recently...
2017-05-14 21:47:48 +02:00
if len ( excludedIDS ) > 0 {
2017-05-23 03:19:28 +02:00
queryString += " AND torrent_id NOT IN (?) "
whereParamsArgs = append ( whereParamsArgs , excludedIDS )
2017-05-14 21:47:48 +02:00
}
2017-05-23 03:19:28 +02:00
// and, if true, that aren't from the old Nyaa database
if fetcher . newTorrentsOnly {
queryString += " AND torrent_id > ? "
2017-05-31 04:21:57 +02:00
whereParamsArgs = append ( whereParamsArgs , config . Conf . Models . LastOldTorrentID )
2017-05-23 03:19:28 +02:00
}
params := serviceBase . CreateWhereParams ( queryString , whereParamsArgs ... )
2017-05-14 23:24:41 +02:00
dbTorrents , err := torrentService . GetTorrentsOrderByNoCount ( & params , "" , fetcher . queueSize , 0 )
2017-05-13 19:58:48 +02:00
2017-05-14 23:24:41 +02:00
if len ( dbTorrents ) == 0 {
2017-05-14 23:10:39 +02:00
log . Infof ( "No torrents for filesize update" )
2017-05-13 19:58:48 +02:00
return
}
2017-05-14 23:10:39 +02:00
if err != nil {
log . Infof ( "Failed to get torrents for metainfo updating" )
2017-05-13 19:58:48 +02:00
return
}
for _ , T := range dbTorrents {
2017-06-02 14:13:35 +02:00
for _ , v := range fetcher . queue {
2017-06-02 16:22:31 +02:00
// skip torrents that are already being processed.
2017-06-02 14:13:35 +02:00
if v . torrent . ID == T . ID {
continue
}
}
if _ , ok := fetcher . failedOperations [ T . ID ] ; ok {
// do not start new jobs that have recently failed.
2017-06-02 16:22:31 +02:00
// these are on cooldown and will be removed from
// fetcher.failedOperations when time is up.
2017-05-13 19:58:48 +02:00
continue
}
operation := NewFetchOperation ( fetcher , T )
2017-06-02 14:13:35 +02:00
if ! fetcher . addToQueue ( operation ) {
// queue is full, stop and wait for results
2017-05-13 19:58:48 +02:00
break
}
2017-06-02 14:13:35 +02:00
log . Infof ( "Added TID %d for filesize fetching" , T . ID )
fetcher . wg . Add ( 1 )
go operation . Start ( fetcher . results )
2017-05-13 19:58:48 +02:00
}
}
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) run ( ) {
2017-05-13 19:58:48 +02:00
var result Result
2017-05-13 21:07:39 +02:00
defer fetcher . wg . Done ( )
2017-06-02 12:03:23 +02:00
for {
2017-05-14 23:10:39 +02:00
fetcher . removeOldFailures ( )
fetcher . fillQueue ( )
2017-06-02 16:22:31 +02:00
2017-05-13 19:58:48 +02:00
select {
2017-06-02 12:03:23 +02:00
case <- fetcher . done :
2017-05-23 03:19:28 +02:00
log . Infof ( "Got done signal on main loop, leaving..." )
2017-06-02 12:03:23 +02:00
return
2017-05-13 19:58:48 +02:00
case result = <- fetcher . results :
fetcher . gotResult ( result )
case <- fetcher . wakeUp . C :
2017-05-23 03:19:28 +02:00
log . Infof ( "Got wake up signal..." )
2017-05-13 19:58:48 +02:00
}
}
}
2017-05-26 12:12:52 +02:00
// RunAsync method
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) RunAsync ( ) {
2017-05-13 21:07:39 +02:00
fetcher . wg . Add ( 1 )
go fetcher . run ( )
}
2017-05-26 12:12:52 +02:00
// Close method
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) Close ( ) error {
2017-05-13 19:58:48 +02:00
fetcher . queueMutex . Lock ( )
defer fetcher . queueMutex . Unlock ( )
// Send the done event to every Operation
for _ , op := range fetcher . queue {
2017-06-02 12:03:23 +02:00
op . done <- struct { } { }
2017-05-13 19:58:48 +02:00
}
2017-06-02 12:03:23 +02:00
fetcher . done <- struct { } { }
2017-05-14 23:10:39 +02:00
fetcher . torrentClient . Close ( )
2017-05-14 16:35:03 +02:00
log . Infof ( "Send done signal to everyone, waiting..." )
2017-05-13 21:07:39 +02:00
fetcher . wg . Wait ( )
2017-05-13 19:58:48 +02:00
return nil
}
2017-05-26 12:12:52 +02:00
// Wait method
2017-05-14 13:20:34 +02:00
func ( fetcher * MetainfoFetcher ) Wait ( ) {
2017-05-13 21:07:39 +02:00
fetcher . wg . Wait ( )
}