From 3fd12245ec3e733384b8a730435ec938b13798e4 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 11 May 2017 07:40:50 -0400 Subject: [PATCH] fix torrent swarm ordering and add timeouts for udp scrape --- common/search.go | 4 +++ main.go | 1 + service/scraper/bucket.go | 29 +++++++++++++++++- service/scraper/scraper.go | 54 ++++++++++++++++++++++++++-------- service/scraper/transaction.go | 16 +++++++--- util/search/search.go | 20 +++++++++++-- 6 files changed, 104 insertions(+), 20 deletions(-) diff --git a/common/search.go b/common/search.go index c9679e79..eace727f 100644 --- a/common/search.go +++ b/common/search.go @@ -19,6 +19,9 @@ const ( Date Downloads Size + Seeders + Leechers + Completed ) type Category struct { @@ -44,5 +47,6 @@ type SearchParam struct { Page int UserID uint Max uint + NotNull string Query string } diff --git a/main.go b/main.go index 815b7579..8c92a00c 100644 --- a/main.go +++ b/main.go @@ -84,6 +84,7 @@ func RunScraper(conf *config.Config) { signals.RegisterCloser(scraper) // run udp scraper worker for workers > 0 { + log.Infof("starting up worker %d", workers) go scraper.RunWorker(pc) workers-- } diff --git a/service/scraper/bucket.go b/service/scraper/bucket.go index da301528..d9c85372 100644 --- a/service/scraper/bucket.go +++ b/service/scraper/bucket.go @@ -28,15 +28,42 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) { t = &Transaction{ TransactionID: id, bucket: b, - swarms: swarms, + swarms: make([]model.Torrent, len(swarms)), state: stateSendID, } + copy(t.swarms, swarms) b.transactions[id] = t b.access.Unlock() return } +func (b *Bucket) ForEachTransaction(v func(uint32, *Transaction)) { + + clone := make(map[uint32]*Transaction) + + b.access.Lock() + + for k := range b.transactions { + clone[k] = b.transactions[k] + } + + b.access.Unlock() + + for k := range clone { + v(k, clone[k]) + } +} + +func (b *Bucket) Forget(tid uint32) { + b.access.Lock() + _, ok := b.transactions[tid] + if ok { + delete(b.transactions, tid) + } + b.access.Unlock() +} + func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) { b.access.Lock() t, ok := b.transactions[tid] diff --git a/service/scraper/scraper.go b/service/scraper/scraper.go index 8077f41b..7d4b8e61 100644 --- a/service/scraper/scraper.go +++ b/service/scraper/scraper.go @@ -13,15 +13,20 @@ import ( // MTU yes this is the ipv6 mtu const MTU = 1500 +// max number of scrapes per packet +const ScrapesPerPacket = 74 + // bittorrent scraper type Scraper struct { - done chan int - sendQueue chan *SendEvent - recvQueue chan *RecvEvent - errQueue chan error - trackers map[string]*Bucket - ticker *time.Ticker - interval time.Duration + done chan int + sendQueue chan *SendEvent + recvQueue chan *RecvEvent + errQueue chan error + trackers map[string]*Bucket + ticker *time.Ticker + cleanup *time.Ticker + interval time.Duration + PacketsPerSecond uint } func New(conf *config.ScraperConfig) (sc *Scraper, err error) { @@ -33,7 +38,13 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) { trackers: make(map[string]*Bucket), ticker: time.NewTicker(time.Second), interval: time.Second * time.Duration(conf.IntervalSeconds), + cleanup: time.NewTicker(time.Second), } + + if sc.PacketsPerSecond == 0 { + sc.PacketsPerSecond = 10 + } + for idx := range conf.Trackers { err = sc.AddTracker(&conf.Trackers[idx]) if err != nil { @@ -144,20 +155,37 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) { func (sc *Scraper) Run() { for { - <-sc.ticker.C - sc.Scrape() + select { + case <-sc.ticker.C: + sc.Scrape(sc.PacketsPerSecond) + break + case <-sc.cleanup.C: + sc.removeStale() + break + } } } -func (sc *Scraper) Scrape() { +func (sc *Scraper) removeStale() { + + for k := range sc.trackers { + sc.trackers[k].ForEachTransaction(func(tid uint32, t *Transaction) { + if t == nil || t.IsTimedOut() { + sc.trackers[k].Forget(tid) + } + }) + } +} + +func (sc *Scraper) Scrape(packets uint) { now := time.Now().Add(0 - sc.interval) - rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT 700", now).Rows() + rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT ?", now, packets*ScrapesPerPacket).Rows() if err == nil { counter := 0 - var scrape [70]model.Torrent + var scrape [ScrapesPerPacket]model.Torrent for rows.Next() { - idx := counter % 70 + idx := counter % ScrapesPerPacket rows.Scan(&scrape[idx].ID, &scrape[idx].Hash) counter++ if idx == 0 { diff --git a/service/scraper/transaction.go b/service/scraper/transaction.go index 90ebcf77..7b7de953 100644 --- a/service/scraper/transaction.go +++ b/service/scraper/transaction.go @@ -11,6 +11,9 @@ import ( "github.com/ewhal/nyaa/util/log" ) +// TransactionTimeout 30 second timeout for transactions +const TransactionTimeout = time.Second * 30 + const stateSendID = 0 const stateRecvID = 1 const stateTransact = 2 @@ -27,13 +30,12 @@ type Transaction struct { bucket *Bucket state uint8 swarms []model.Torrent + lastData time.Time } // Done marks this transaction as done and removes it from parent func (t *Transaction) Done() { - t.bucket.access.Lock() - delete(t.bucket.transactions, t.TransactionID) - t.bucket.access.Unlock() + t.bucket.Forget(t.TransactionID) } func (t *Transaction) handleScrapeReply(data []byte) { @@ -95,6 +97,7 @@ func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) { binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID) t.state = stateRecvID } + t.lastData = time.Now() return } @@ -104,7 +107,7 @@ func (t *Transaction) handleError(msg string) { // handle data for transaction func (t *Transaction) GotData(data []byte) (done bool) { - + t.lastData = time.Now() if len(data) > 4 { cmd := binary.BigEndian.Uint32(data) switch cmd { @@ -132,3 +135,8 @@ func (t *Transaction) GotData(data []byte) (done bool) { } return } + +func (t *Transaction) IsTimedOut() bool { + return t.lastData.Add(TransactionTimeout).Before(time.Now()) + +} diff --git a/util/search/search.go b/util/search/search.go index a2e7ec47..ba66540f 100644 --- a/util/search/search.go +++ b/util/search/search.go @@ -40,7 +40,7 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) ( search.Page = pagenum search.Query = r.URL.Query().Get("q") userID, _ := strconv.Atoi(r.URL.Query().Get("userID")) - search.UserID = uint(userID) + search.UserID = uint(userID) switch s := r.URL.Query().Get("s"); s { case "1": @@ -75,22 +75,36 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) ( case "1": search.Sort = common.Name orderBy += "torrent_name" + break case "2": search.Sort = common.Date orderBy += "date" + break case "3": search.Sort = common.Downloads orderBy += "downloads" + break case "4": search.Sort = common.Size orderBy += "filesize" + break case "5": + search.Sort = common.Seeders orderBy += "seeders" + search.NotNull += "seeders IS NOT NULL " + break case "6": + search.Sort = common.Leechers orderBy += "leechers" + search.NotNull += "leechers IS NOT NULL " + break case "7": + search.Sort = common.Completed orderBy += "completed" + search.NotNull += "completed IS NOT NULL " + break default: + search.Sort = common.ID orderBy += "torrent_id" } @@ -129,7 +143,9 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) ( } parameters.Params = append(parameters.Params, strconv.Itoa(int(search.Status)+1)) } - + if len(search.NotNull) > 0 { + conditions = append(conditions, search.NotNull) + } searchQuerySplit := strings.Fields(search.Query) for i, word := range searchQuerySplit { firstRune, _ := utf8.DecodeRuneInString(word)