From dfa0b2ab73bbe9c3b35fbb252ff476d092996a82 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Wed, 10 May 2017 13:29:35 -0400 Subject: [PATCH] initial --- config/config.go | 5 +- config/scrape.go | 29 ++++++ db/gorm.go | 10 +- main.go | 48 ++++++++- model/torrent.go | 42 +++++--- network/network.go | 14 +++ router/apiHandler.go | 2 +- router/rssHandler.go | 6 +- router/uploadHandler.go | 2 +- service/scraper/bucket.go | 49 +++++++++ service/scraper/errors.go | 7 ++ service/scraper/event.go | 36 +++++++ service/scraper/scraper.go | 183 +++++++++++++++++++++++++++++++++ service/scraper/transaction.go | 117 +++++++++++++++++++++ templates/view.html | 4 + 15 files changed, 527 insertions(+), 27 deletions(-) create mode 100644 config/scrape.go create mode 100644 service/scraper/bucket.go create mode 100644 service/scraper/errors.go create mode 100644 service/scraper/event.go create mode 100644 service/scraper/scraper.go create mode 100644 service/scraper/transaction.go diff --git a/config/config.go b/config/config.go index 9f14548d..bea5b8d9 100644 --- a/config/config.go +++ b/config/config.go @@ -22,11 +22,13 @@ type Config struct { // DBParams will be directly passed to Gorm, and its internal // structure depends on the dialect for each db type DBParams string `json:"db_params"` + // tracker scraper config (required) + Scrape ScraperConfig `json:"scraper"` // optional i2p configuration I2P *I2PConfig `json:"i2p"` } -var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", nil} +var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", DefaultScraperConfig, nil} var allowedDatabaseTypes = map[string]bool{ "sqlite3": true, @@ -41,6 +43,7 @@ func New() *Config { config.Port = Defaults.Port config.DBType = Defaults.DBType config.DBParams = Defaults.DBParams + config.Scrape = Defaults.Scrape return &config } diff --git a/config/scrape.go b/config/scrape.go new file mode 100644 index 00000000..f72e385e --- /dev/null +++ b/config/scrape.go @@ -0,0 +1,29 @@ +package config + +type ScrapeConfig struct { + URL string `json:"scrape_url"` + Name string `json:"name"` + IntervalSeconds int64 `json:"interval"` +} + +type ScraperConfig struct { + Addr string `json:"bind"` + NumWorkers int `json:"workers"` + IntervalSeconds int64 `json:"default_interval"` + Trackers []ScrapeConfig `json:"trackers"` +} + +// DefaultScraperConfig is the default config for bittorrent scraping +var DefaultScraperConfig = ScraperConfig{ + Addr: ":9999", + // TODO: query system? + NumWorkers: 4, + // every hour + IntervalSeconds: 60 * 60, + Trackers: []ScrapeConfig{ + ScrapeConfig{ + URL: "udp://tracker.doko.moe:6969/", + Name: "doko.moe", + }, + }, +} diff --git a/db/gorm.go b/db/gorm.go index 8872bd17..30aa729e 100644 --- a/db/gorm.go +++ b/db/gorm.go @@ -1,10 +1,10 @@ package db import ( + "github.com/azhao12345/gorm" "github.com/ewhal/nyaa/config" "github.com/ewhal/nyaa/model" "github.com/ewhal/nyaa/util/log" - "github.com/azhao12345/gorm" _ "github.com/jinzhu/gorm/dialects/postgres" _ "github.com/jinzhu/gorm/dialects/sqlite" ) @@ -27,13 +27,13 @@ func GormInit(conf *config.Config) (*gorm.DB, error) { db.DB().SetMaxIdleConns(10) db.DB().SetMaxOpenConns(100) - // TODO: Enable Gorm initialization for non-development builds if config.Environment == "DEVELOPMENT" { db.LogMode(true) - db.AutoMigrate(&model.User{}, &model.UserFollows{}, &model.UserUploadsOld{}) - db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{}) - db.AutoMigrate(&model.Comment{}, &model.OldComment{}) } + db.AutoMigrate(&model.User{}, &model.UserFollows{}, &model.UserUploadsOld{}) + db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{}) + db.AutoMigrate(&model.Comment{}, &model.OldComment{}) + return db, nil } diff --git a/main.go b/main.go index 3c89c7a8..8381e429 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "bufio" "flag" + "net/http" "os" "path/filepath" @@ -13,6 +14,7 @@ import ( "github.com/ewhal/nyaa/db" "github.com/ewhal/nyaa/network" "github.com/ewhal/nyaa/router" + "github.com/ewhal/nyaa/service/scraper" "github.com/ewhal/nyaa/util/log" "github.com/ewhal/nyaa/util/signals" "github.com/nicksnyder/go-i18n/i18n" @@ -29,6 +31,7 @@ func initI18N() { } } +// RunServer runs webapp mainloop func RunServer(conf *config.Config) { http.Handle("/", router.Router) @@ -54,10 +57,46 @@ func RunServer(conf *config.Config) { } } +// RunScraper runs tracker scraper mainloop +func RunScraper(conf *config.Config) { + + // bind to network + pc, err := network.CreateScraperSocket(conf) + if err != nil { + log.Fatalf("failed to bind udp socket for scraper: %s", err) + } + // configure tracker scraperv + var scraper *scraperService.Scraper + scraper, err = scraperService.New(&conf.Scrape) + if err != nil { + pc.Close() + log.Fatalf("failed to configure scraper: %s", err) + } + + workers := conf.Scrape.NumWorkers + if workers < 1 { + workers = 1 + } + + // register udp socket with signals + signals.RegisterCloser(pc) + // register scraper with signals + signals.RegisterCloser(scraper) + // run udp scraper worker + for workers > 0 { + go scraper.RunWorker(pc) + workers-- + } + // run scraper + go scraper.Run() + scraper.Wait() +} + func main() { conf := config.New() processFlags := conf.BindFlags() defaults := flag.Bool("print-defaults", false, "print the default configuration file on stdout") + mode := flag.String("mode", "webapp", "which mode to run daemon in, either webapp or scraper") flag.Float64Var(&cache.Size, "c", cache.Size, "size of the search cache in MB") flag.Parse() if *defaults { @@ -88,6 +127,13 @@ func main() { log.Fatal(err.Error()) } } - RunServer(conf) + if *mode == "scraper" { + log.Init("DEVELOPMENT") + RunScraper(conf) + } else if *mode == "webapp" { + RunServer(conf) + } else { + log.Fatalf("invalid runtime mode: %s", *mode) + } } } diff --git a/model/torrent.go b/model/torrent.go index 0d2dcde2..c349015c 100644 --- a/model/torrent.go +++ b/model/torrent.go @@ -20,25 +20,30 @@ type Feed struct { } type Torrent struct { - ID uint `gorm:"column:torrent_id;primary_key"` - Name string `gorm:"column:torrent_name"` - Hash string `gorm:"column:torrent_hash"` - Category int `gorm:"column:category"` - SubCategory int `gorm:"column:sub_category"` - Status int `gorm:"column:status"` - Date time.Time `gorm:"column:date"` - UploaderID uint `gorm:"column:uploader"` - Downloads int `gorm:"column:downloads"` - Stardom int `gorm:"column:stardom"` - Filesize int64 `gorm:"column:filesize"` - Description string `gorm:"column:description"` - WebsiteLink string `gorm:"column:website_link"` + ID uint `gorm:"column:torrent_id;primary_key"` + Name string `gorm:"column:torrent_name"` + Hash string `gorm:"column:torrent_hash"` + Category int `gorm:"column:category"` + SubCategory int `gorm:"column:sub_category"` + Status int `gorm:"column:status"` + Date int64 `gorm:"column:date"` + UploaderID uint `gorm:"column:uploader"` + Downloads int `gorm:"column:downloads"` + Stardom int `gorm:"column:stardom"` + Filesize int64 `gorm:"column:filesize"` + Description string `gorm:"column:description"` + WebsiteLink string `gorm:"column:website_link"` DeletedAt *time.Time Uploader *User `gorm:"ForeignKey:UploaderId"` OldUploader string `gorm:"-"` // ??????? OldComments []OldComment `gorm:"ForeignKey:torrent_id"` Comments []Comment `gorm:"ForeignKey:torrent_id"` + + Seeders uint32 `gorm:"column:seeders"` + Leechers uint32 `gorm:"column:leechers"` + Completed uint32 `gorm:"column:completed"` + LastScrape int64 `gorm:"column:last_scrape"` } // Returns the total size of memory recursively allocated for this struct @@ -113,6 +118,9 @@ type TorrentJSON struct { WebsiteLink template.URL `json:"website_link"` Magnet template.URL `json:"magnet"` TorrentLink template.URL `json:"torrent"` + Seeders uint32 `json:"seeders"` + Leechers uint32 `json:"leechers"` + LastScrape time.Time `json:"last_scrape"` } type TorrentReportJson struct { @@ -154,7 +162,7 @@ func (t *Torrent) ToJSON() TorrentJSON { Name: t.Name, Status: t.Status, Hash: t.Hash, - Date: t.Date.Format(time.RFC3339), + Date: time.Unix(t.Date, 0).Format(time.RFC3339), Filesize: util.FormatFilesize2(t.Filesize), Description: util.MarkdownToHTML(t.Description), Comments: commentsJSON, @@ -166,7 +174,11 @@ func (t *Torrent) ToJSON() TorrentJSON { OldUploader: util.SafeText(t.OldUploader), WebsiteLink: util.Safe(t.WebsiteLink), Magnet: util.Safe(magnet), - TorrentLink: util.Safe(torrentlink)} + TorrentLink: util.Safe(torrentlink), + Leechers: t.Leechers, + Seeders: t.Seeders, + LastScrape: time.Unix(t.LastScrape, 0), + } return res } diff --git a/network/network.go b/network/network.go index d851da0a..58ad8eb3 100644 --- a/network/network.go +++ b/network/network.go @@ -25,3 +25,17 @@ func CreateHTTPListener(conf *config.Config) (l net.Listener, err error) { } return } + +// CreateScraperSocket creates a UDP Scraper socket +func CreateScraperSocket(conf *config.Config) (pc net.PacketConn, err error) { + if conf.I2P == nil { + var laddr *net.UDPAddr + laddr, err = net.ResolveUDPAddr("udp", conf.Scrape.Addr) + if err == nil { + pc, err = net.ListenUDP("udp", laddr) + } + } else { + log.Fatal("i2p udp scraper not supported") + } + return +} diff --git a/router/apiHandler.go b/router/apiHandler.go index a01fb58e..19d2cdb8 100644 --- a/router/apiHandler.go +++ b/router/apiHandler.go @@ -132,7 +132,7 @@ func ApiUploadHandler(w http.ResponseWriter, r *http.Request) { SubCategory: upload.SubCategory, Status: 1, Hash: upload.Hash, - Date: time.Now(), + Date: time.Now().Unix(), Filesize: 0, //? Description: upload.Description, UploaderID: user.ID, diff --git a/router/rssHandler.go b/router/rssHandler.go index 422b0622..f5dad04f 100644 --- a/router/rssHandler.go +++ b/router/rssHandler.go @@ -19,7 +19,7 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) { createdAsTime := time.Now() if len(torrents) > 0 { - createdAsTime = torrents[0].Date + createdAsTime = time.Unix(torrents[0].Date, 0) } feed := &feeds.Feed{ Title: "Nyaa Pantsu", @@ -37,8 +37,8 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) { Title: torrents[i].Name, Link: &feeds.Link{Href: string(torrentJSON.Magnet)}, Description: "", - Created: torrents[0].Date, - Updated: torrents[0].Date, + Created: time.Unix(torrents[0].Date, 0), + Updated: time.Unix(torrents[0].Date, 0), } } diff --git a/router/uploadHandler.go b/router/uploadHandler.go index 7a358da8..66d2a78b 100644 --- a/router/uploadHandler.go +++ b/router/uploadHandler.go @@ -40,7 +40,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) { SubCategory: uploadForm.SubCategoryID, Status: 1, Hash: uploadForm.Infohash, - Date: time.Now(), + Date: time.Now().Unix(), Filesize: uploadForm.Filesize, Description: uploadForm.Description, UploaderID: user.ID} diff --git a/service/scraper/bucket.go b/service/scraper/bucket.go new file mode 100644 index 00000000..b4c0b3a7 --- /dev/null +++ b/service/scraper/bucket.go @@ -0,0 +1,49 @@ +package scraperService + +import ( + "math/rand" + "net" + + "github.com/ewhal/nyaa/model" +) + +const InitialConnectionID = 0x41727101980 + +type Bucket struct { + Addr net.Addr + transactions map[uint32]*Transaction +} + +func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) { + id := rand.Uint32() + // get good id + _, ok := b.transactions[id] + for ok { + id = rand.Uint32() + _, ok = b.transactions[id] + } + t = &Transaction{ + TransactionID: id, + swarms: swarms, + state: stateSendID, + } + b.transactions[id] = t + return + +} + +func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) { + t, ok := b.transactions[tid] + if ok { + go v(t) + } else { + v(nil) + } +} + +func NewBucket(a net.Addr) *Bucket { + return &Bucket{ + transactions: make(map[uint32]*Transaction), + Addr: a, + } +} diff --git a/service/scraper/errors.go b/service/scraper/errors.go new file mode 100644 index 00000000..4b0b8584 --- /dev/null +++ b/service/scraper/errors.go @@ -0,0 +1,7 @@ +package scraperService + +import ( + "errors" +) + +var ErrShortPacket = errors.New("short udp packet") diff --git a/service/scraper/event.go b/service/scraper/event.go new file mode 100644 index 00000000..c3eef73d --- /dev/null +++ b/service/scraper/event.go @@ -0,0 +1,36 @@ +package scraperService + +import ( + "encoding/binary" + "net" +) + +type RecvEvent struct { + From net.Addr + Data []byte +} + +// TID extract transaction id +func (ev *RecvEvent) TID() (id uint32, err error) { + if len(ev.Data) < 8 { + err = ErrShortPacket + } else { + id = binary.BigEndian.Uint32(ev.Data[4:]) + } + return +} + +// Action extract action +func (ev *RecvEvent) Action() (action uint32, err error) { + if len(ev.Data) < 4 { + err = ErrShortPacket + } else { + action = binary.BigEndian.Uint32(ev.Data) + } + return +} + +type SendEvent struct { + To net.Addr + Data []byte +} diff --git a/service/scraper/scraper.go b/service/scraper/scraper.go new file mode 100644 index 00000000..a7196b96 --- /dev/null +++ b/service/scraper/scraper.go @@ -0,0 +1,183 @@ +package scraperService + +import ( + "github.com/ewhal/nyaa/config" + "github.com/ewhal/nyaa/db" + "github.com/ewhal/nyaa/model" + "github.com/ewhal/nyaa/util/log" + "net" + "net/url" + "time" +) + +// MTU yes this is the ipv6 mtu +const MTU = 1488 + +// bittorrent scraper +type Scraper struct { + done chan int + sendQueue chan *SendEvent + recvQueue chan *RecvEvent + errQueue chan error + trackers map[string]*Bucket + ticker *time.Ticker + interval time.Duration +} + +func New(conf *config.ScraperConfig) (sc *Scraper, err error) { + sc = &Scraper{ + done: make(chan int), + sendQueue: make(chan *SendEvent, 128), + recvQueue: make(chan *RecvEvent, 1028), + errQueue: make(chan error), + trackers: make(map[string]*Bucket), + ticker: time.NewTicker(time.Minute), + interval: time.Second * time.Duration(conf.IntervalSeconds), + } + for idx := range conf.Trackers { + err = sc.AddTracker(&conf.Trackers[idx]) + if err != nil { + break + } + } + return +} + +func (sc *Scraper) AddTracker(conf *config.ScrapeConfig) (err error) { + var u *url.URL + u, err = url.Parse(conf.URL) + if err == nil { + var ips []net.IP + ips, err = net.LookupIP(u.Hostname()) + if err == nil { + // TODO: use more than 1 ip ? + addr := &net.UDPAddr{ + IP: ips[0], + } + addr.Port, err = net.LookupPort("udp", u.Port()) + if err == nil { + sc.trackers[addr.String()] = NewBucket(addr) + } + } + } + return +} + +func (sc *Scraper) Close() (err error) { + close(sc.sendQueue) + close(sc.recvQueue) + close(sc.errQueue) + sc.ticker.Stop() + sc.done <- 1 + return +} + +func (sc *Scraper) runRecv(pc net.PacketConn) { + for { + var buff [MTU]byte + n, from, err := pc.ReadFrom(buff[:]) + + if err == nil { + + log.Debugf("got %d from %s", n, from) + sc.recvQueue <- &RecvEvent{ + From: from, + Data: buff[:n], + } + } else { + sc.errQueue <- err + } + } +} + +func (sc *Scraper) runSend(pc net.PacketConn) { + for { + ev, ok := <-sc.sendQueue + if !ok { + return + } + log.Debugf("write %d to %s", len(ev.Data), ev.To) + pc.WriteTo(ev.Data, ev.To) + } +} + +func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) { + + go sc.runRecv(pc) + go sc.runSend(pc) + for { + var bucket *Bucket + ev, ok := <-sc.recvQueue + if !ok { + break + } + tid, err := ev.TID() + action, err := ev.Action() + log.Debugf("transaction = %d action = %d", tid, action) + if err == nil { + bucket, ok = sc.trackers[ev.From.String()] + if ok && bucket != nil { + bucket.VisitTransaction(tid, func(t *Transaction) { + if t == nil { + log.Warnf("no transaction %d", tid) + } else { + if t.GotData(ev.Data) { + err := t.Sync() + if err != nil { + log.Warnf("failed to sync swarm: %s", err) + } + t.Done() + } else { + sc.sendQueue <- t.SendEvent(ev.From) + } + } + }) + } else { + log.Warnf("bucket not found for %s", ev.From) + } + } + + } + return +} + +func (sc *Scraper) Run() { + sc.Scrape() + for { + <-sc.ticker.C + sc.Scrape() + } +} + +func (sc *Scraper) Scrape() { + + swarms := make([]model.Torrent, 0, 128) + now := time.Now().Add(0 - sc.interval).Unix() + err := db.ORM.Where("last_scrape < ?", now).Find(&swarms).Error + if err == nil { + for swarms != nil { + var scrape []model.Torrent + if len(swarms) > 74 { + scrape = swarms[:74] + swarms = swarms[74:] + } else { + scrape = swarms + swarms = nil + } + log.Infof("scraping %d", len(scrape)) + if len(scrape) > 0 { + for _, b := range sc.trackers { + t := b.NewTransaction(scrape) + log.Debugf("new transaction %d", t.TransactionID) + sc.sendQueue <- t.SendEvent(b.Addr) + } + } + } + } else { + log.Warnf("failed to select torrents for scrape: %s", err) + } +} + +func (sc *Scraper) Wait() { + <-sc.done +} diff --git a/service/scraper/transaction.go b/service/scraper/transaction.go new file mode 100644 index 00000000..0463a574 --- /dev/null +++ b/service/scraper/transaction.go @@ -0,0 +1,117 @@ +package scraperService + +import ( + "encoding/binary" + "encoding/hex" + "net" + "time" + + "github.com/ewhal/nyaa/db" + "github.com/ewhal/nyaa/model" + "github.com/ewhal/nyaa/util/log" +) + +const stateSendID = 0 +const stateRecvID = 1 +const stateTransact = 2 + +const actionError = 3 +const actionScrape = 2 +const actionAnnounce = 1 +const actionConnect = 0 + +// Transaction a scrape transaction on a udp tracker +type Transaction struct { + TransactionID uint32 + ConnectionID uint64 + bucket *Bucket + state uint8 + swarms []model.Torrent +} + +// Done marks this transaction as done and removes it from parent +func (t *Transaction) Done() { + delete(t.bucket.transactions, t.TransactionID) +} + +func (t *Transaction) handleScrapeReply(data []byte) { + data = data[8:] + now := time.Now().Unix() + for idx := range t.swarms { + t.swarms[idx].Seeders = binary.BigEndian.Uint32(data[:idx*12]) + t.swarms[idx].Completed = binary.BigEndian.Uint32(data[:(idx*12)+4]) + t.swarms[idx].Leechers = binary.BigEndian.Uint32(data[:(idx*12)+8]) + t.swarms[idx].LastScrape = now + } +} + +// Sync syncs models with database +func (t *Transaction) Sync() (err error) { + err = db.ORM.Update(t.swarms).Error + return +} + +// create send event +func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) { + ev = &SendEvent{ + To: to, + } + if t.state == stateRecvID { + l := len(t.swarms) * 20 + l += 16 + + ev.Data = make([]byte, l) + + binary.BigEndian.PutUint64(ev.Data[:], t.ConnectionID) + binary.BigEndian.PutUint32(ev.Data[8:], 2) + binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID) + for idx := range t.swarms { + ih, err := hex.DecodeString(t.swarms[idx].Hash) + if err == nil && len(ih) == 20 { + copy(ev.Data[16+(idx*20):], ih) + } + } + } else if t.state == stateSendID { + ev.Data = make([]byte, 16) + binary.BigEndian.PutUint64(ev.Data, InitialConnectionID) + binary.BigEndian.PutUint32(ev.Data[8:], 0) + binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID) + t.state = stateRecvID + } + return +} + +func (t *Transaction) handleError(msg string) { + log.Infof("scrape failed: %s", msg) +} + +// handle data for transaction +func (t *Transaction) GotData(data []byte) (done bool) { + + if len(data) > 4 { + cmd := binary.BigEndian.Uint32(data) + switch cmd { + case actionConnect: + if len(data) == 16 { + if t.state == stateRecvID { + t.ConnectionID = binary.BigEndian.Uint64(data[8:]) + } + } + break + case actionScrape: + if len(data) == (12*len(t.swarms))+8 && t.state == stateTransact { + t.handleScrapeReply(data[8:]) + } + done = true + break + case actionError: + if len(data) == 12 { + t.handleError(string(data[4:12])) + + } + default: + done = true + } + } + return +} diff --git a/templates/view.html b/templates/view.html index c3bd33b8..e2023b57 100644 --- a/templates/view.html +++ b/templates/view.html @@ -26,6 +26,10 @@ {{T "size"}} {{.Filesize}} + + Swarm + {{.Leechers}} (leech) / {{.Seeders}} (seed) / {{.LastScrape}} (updated) + Uploader