From 604cf57677894943e470dc85eada1ce8eae37032 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 10 May 2017 18:06:21 -0400 Subject: [PATCH] make it work --- main.go | 1 - model/torrent.go | 38 +++++++++++++++++----------------- router/apiHandler.go | 2 +- router/rssHandler.go | 6 +++--- router/uploadHandler.go | 2 +- service/scraper/bucket.go | 9 +++++++- service/scraper/scraper.go | 34 ++++++++++++++---------------- service/scraper/transaction.go | 29 ++++++++++++++++++++------ 8 files changed, 70 insertions(+), 51 deletions(-) diff --git a/main.go b/main.go index 8381e429..815b7579 100644 --- a/main.go +++ b/main.go @@ -128,7 +128,6 @@ func main() { } } if *mode == "scraper" { - log.Init("DEVELOPMENT") RunScraper(conf) } else if *mode == "webapp" { RunServer(conf) diff --git a/model/torrent.go b/model/torrent.go index 935dce23..ed577417 100644 --- a/model/torrent.go +++ b/model/torrent.go @@ -20,19 +20,19 @@ type Feed struct { } type Torrent struct { - ID uint `gorm:"column:torrent_id;primary_key"` - Name string `gorm:"column:torrent_name"` - Hash string `gorm:"column:torrent_hash"` - Category int `gorm:"column:category"` - SubCategory int `gorm:"column:sub_category"` - Status int `gorm:"column:status"` - Date int64 `gorm:"column:date"` - UploaderID uint `gorm:"column:uploader"` - Downloads int `gorm:"column:downloads"` - Stardom int `gorm:"column:stardom"` - Filesize int64 `gorm:"column:filesize"` - Description string `gorm:"column:description"` - WebsiteLink string `gorm:"column:website_link"` + ID uint `gorm:"column:torrent_id;primary_key"` + Name string `gorm:"column:torrent_name"` + Hash string `gorm:"column:torrent_hash"` + Category int `gorm:"column:category"` + SubCategory int `gorm:"column:sub_category"` + Status int `gorm:"column:status"` + Date time.Time `gorm:"column:date"` + UploaderID uint `gorm:"column:uploader"` + Downloads int `gorm:"column:downloads"` + Stardom int `gorm:"column:stardom"` + Filesize int64 `gorm:"column:filesize"` + Description string `gorm:"column:description"` + WebsiteLink string `gorm:"column:website_link"` DeletedAt *time.Time Uploader *User `gorm:"ForeignKey:uploader"` @@ -40,10 +40,10 @@ type Torrent struct { OldComments []OldComment `gorm:"ForeignKey:torrent_id"` Comments []Comment `gorm:"ForeignKey:torrent_id"` - Seeders uint32 `gorm:"column:seeders"` - Leechers uint32 `gorm:"column:leechers"` - Completed uint32 `gorm:"column:completed"` - LastScrape int64 `gorm:"column:last_scrape"` + Seeders uint32 `gorm:"column:seeders"` + Leechers uint32 `gorm:"column:leechers"` + Completed uint32 `gorm:"column:completed"` + LastScrape time.Time `gorm:"column:last_scrape"` } // Returns the total size of memory recursively allocated for this struct @@ -162,7 +162,7 @@ func (t *Torrent) ToJSON() TorrentJSON { Name: t.Name, Status: t.Status, Hash: t.Hash, - Date: time.Unix(t.Date, 0).Format(time.RFC3339), + Date: t.Date.Format(time.RFC3339), Filesize: util.FormatFilesize2(t.Filesize), Description: util.MarkdownToHTML(t.Description), Comments: commentsJSON, @@ -177,7 +177,7 @@ func (t *Torrent) ToJSON() TorrentJSON { TorrentLink: util.Safe(torrentlink), Leechers: t.Leechers, Seeders: t.Seeders, - LastScrape: time.Unix(t.LastScrape, 0), + LastScrape: t.LastScrape, } return res diff --git a/router/apiHandler.go b/router/apiHandler.go index 19d2cdb8..a01fb58e 100644 --- a/router/apiHandler.go +++ b/router/apiHandler.go @@ -132,7 +132,7 @@ func ApiUploadHandler(w http.ResponseWriter, r *http.Request) { SubCategory: upload.SubCategory, Status: 1, Hash: upload.Hash, - Date: time.Now().Unix(), + Date: time.Now(), Filesize: 0, //? Description: upload.Description, UploaderID: user.ID, diff --git a/router/rssHandler.go b/router/rssHandler.go index f5dad04f..422b0622 100644 --- a/router/rssHandler.go +++ b/router/rssHandler.go @@ -19,7 +19,7 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) { createdAsTime := time.Now() if len(torrents) > 0 { - createdAsTime = time.Unix(torrents[0].Date, 0) + createdAsTime = torrents[0].Date } feed := &feeds.Feed{ Title: "Nyaa Pantsu", @@ -37,8 +37,8 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) { Title: torrents[i].Name, Link: &feeds.Link{Href: string(torrentJSON.Magnet)}, Description: "", - Created: time.Unix(torrents[0].Date, 0), - Updated: time.Unix(torrents[0].Date, 0), + Created: torrents[0].Date, + Updated: torrents[0].Date, } } diff --git a/router/uploadHandler.go b/router/uploadHandler.go index 66d2a78b..7a358da8 100644 --- a/router/uploadHandler.go +++ b/router/uploadHandler.go @@ -40,7 +40,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) { SubCategory: uploadForm.SubCategoryID, Status: 1, Hash: uploadForm.Infohash, - Date: time.Now().Unix(), + Date: time.Now(), Filesize: uploadForm.Filesize, Description: uploadForm.Description, UploaderID: user.ID} diff --git a/service/scraper/bucket.go b/service/scraper/bucket.go index b4c0b3a7..da301528 100644 --- a/service/scraper/bucket.go +++ b/service/scraper/bucket.go @@ -3,6 +3,7 @@ package scraperService import ( "math/rand" "net" + "sync" "github.com/ewhal/nyaa/model" ) @@ -11,12 +12,14 @@ const InitialConnectionID = 0x41727101980 type Bucket struct { Addr net.Addr + access sync.Mutex transactions map[uint32]*Transaction } func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) { id := rand.Uint32() // get good id + b.access.Lock() _, ok := b.transactions[id] for ok { id = rand.Uint32() @@ -24,18 +27,22 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) { } t = &Transaction{ TransactionID: id, + bucket: b, swarms: swarms, state: stateSendID, } b.transactions[id] = t + b.access.Unlock() return } func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) { + b.access.Lock() t, ok := b.transactions[tid] + b.access.Unlock() if ok { - go v(t) + v(t) } else { v(nil) } diff --git a/service/scraper/scraper.go b/service/scraper/scraper.go index 155e3409..c55d3be9 100644 --- a/service/scraper/scraper.go +++ b/service/scraper/scraper.go @@ -11,7 +11,7 @@ import ( ) // MTU yes this is the ipv6 mtu -const MTU = 1488 +const MTU = 1500 // bittorrent scraper type Scraper struct { @@ -31,7 +31,7 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) { recvQueue: make(chan *RecvEvent, 1028), errQueue: make(chan error), trackers: make(map[string]*Bucket), - ticker: time.NewTicker(time.Minute), + ticker: time.NewTicker(time.Second), interval: time.Second * time.Duration(conf.IntervalSeconds), } for idx := range conf.Trackers { @@ -127,6 +127,7 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) { log.Warnf("failed to sync swarm: %s", err) } t.Done() + log.Debugf("transaction %d done", tid) } else { sc.sendQueue <- t.SendEvent(ev.From) } @@ -142,7 +143,6 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) { } func (sc *Scraper) Run() { - sc.Scrape() for { <-sc.ticker.C sc.Scrape() @@ -150,29 +150,25 @@ func (sc *Scraper) Run() { } func (sc *Scraper) Scrape() { + now := time.Now().Add(0 - sc.interval) - swarms := make([]model.Torrent, 0, 128) - now := time.Now().Add(0 - sc.interval).Unix() - err := db.ORM.Where("last_scrape < ?", now).Or("last_scrape IS NULL").Find(&swarms).Error + rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT 700", now).Rows() if err == nil { - for swarms != nil { - var scrape []model.Torrent - if len(swarms) > 74 { - scrape = swarms[:74] - swarms = swarms[74:] - } else { - scrape = swarms - swarms = nil - } - log.Infof("scraping %d", len(scrape)) - if len(scrape) > 0 { + counter := 0 + var scrape [70]model.Torrent + for rows.Next() { + idx := counter % 70 + rows.Scan(&scrape[idx].ID, &scrape[idx].Hash) + counter++ + if idx == 0 { for _, b := range sc.trackers { - t := b.NewTransaction(scrape) - log.Debugf("new transaction %d", t.TransactionID) + t := b.NewTransaction(scrape[:]) sc.sendQueue <- t.SendEvent(b.Addr) } } } + rows.Close() + } else { log.Warnf("failed to select torrents for scrape: %s", err) } diff --git a/service/scraper/transaction.go b/service/scraper/transaction.go index 0463a574..90ebcf77 100644 --- a/service/scraper/transaction.go +++ b/service/scraper/transaction.go @@ -31,23 +31,39 @@ type Transaction struct { // Done marks this transaction as done and removes it from parent func (t *Transaction) Done() { + t.bucket.access.Lock() delete(t.bucket.transactions, t.TransactionID) + t.bucket.access.Unlock() } func (t *Transaction) handleScrapeReply(data []byte) { data = data[8:] - now := time.Now().Unix() + now := time.Now() for idx := range t.swarms { - t.swarms[idx].Seeders = binary.BigEndian.Uint32(data[:idx*12]) - t.swarms[idx].Completed = binary.BigEndian.Uint32(data[:(idx*12)+4]) - t.swarms[idx].Leechers = binary.BigEndian.Uint32(data[:(idx*12)+8]) + t.swarms[idx].Seeders = binary.BigEndian.Uint32(data) + data = data[4:] + t.swarms[idx].Completed = binary.BigEndian.Uint32(data) + data = data[4:] + t.swarms[idx].Leechers = binary.BigEndian.Uint32(data) + data = data[4:] t.swarms[idx].LastScrape = now + idx++ } } // Sync syncs models with database func (t *Transaction) Sync() (err error) { - err = db.ORM.Update(t.swarms).Error + for idx := range t.swarms { + err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{ + "seeders": t.swarms[idx].Seeders, + "leechers": t.swarms[idx].Leechers, + "completed": t.swarms[idx].Completed, + "last_scrape": t.swarms[idx].LastScrape, + }).Error + if err != nil { + break + } + } return } @@ -71,6 +87,7 @@ func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) { copy(ev.Data[16+(idx*20):], ih) } } + t.state = stateTransact } else if t.state == stateSendID { ev.Data = make([]byte, 16) binary.BigEndian.PutUint64(ev.Data, InitialConnectionID) @@ -100,7 +117,7 @@ func (t *Transaction) GotData(data []byte) (done bool) { break case actionScrape: if len(data) == (12*len(t.swarms))+8 && t.state == stateTransact { - t.handleScrapeReply(data[8:]) + t.handleScrapeReply(data) } done = true break