Albirew/nyaa-pantsu
Archivé
1
0
Bifurcation 0

Merge branch 'scraper' of git://github.com/majestrate/nyaa into majestrate-scraper

Cette révision appartient à :
Eliot Whalan 2017-05-11 09:11:54 +10:00
révision 30d4ba5e9c
12 fichiers modifiés avec 525 ajouts et 10 suppressions

Voir le fichier

@ -22,11 +22,13 @@ type Config struct {
// DBParams will be directly passed to Gorm, and its internal
// structure depends on the dialect for each db type
DBParams string `json:"db_params"`
// tracker scraper config (required)
Scrape ScraperConfig `json:"scraper"`
// optional i2p configuration
I2P *I2PConfig `json:"i2p"`
}
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", nil}
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", DefaultScraperConfig, nil}
var allowedDatabaseTypes = map[string]bool{
"sqlite3": true,
@ -41,6 +43,7 @@ func New() *Config {
config.Port = Defaults.Port
config.DBType = Defaults.DBType
config.DBParams = Defaults.DBParams
config.Scrape = Defaults.Scrape
return &config
}

29
config/scrape.go Fichier normal
Voir le fichier

@ -0,0 +1,29 @@
package config
type ScrapeConfig struct {
URL string `json:"scrape_url"`
Name string `json:"name"`
IntervalSeconds int64 `json:"interval"`
}
type ScraperConfig struct {
Addr string `json:"bind"`
NumWorkers int `json:"workers"`
IntervalSeconds int64 `json:"default_interval"`
Trackers []ScrapeConfig `json:"trackers"`
}
// DefaultScraperConfig is the default config for bittorrent scraping
var DefaultScraperConfig = ScraperConfig{
Addr: ":9999",
// TODO: query system?
NumWorkers: 4,
// every hour
IntervalSeconds: 60 * 60,
Trackers: []ScrapeConfig{
ScrapeConfig{
URL: "udp://tracker.coppersurfer.tk:6969/",
Name: "coppersurfer.tk",
},
},
}

Voir le fichier

@ -1,10 +1,10 @@
package db
import (
"github.com/azhao12345/gorm"
"github.com/ewhal/nyaa/config"
"github.com/ewhal/nyaa/model"
"github.com/ewhal/nyaa/util/log"
"github.com/azhao12345/gorm"
_ "github.com/jinzhu/gorm/dialects/postgres"
_ "github.com/jinzhu/gorm/dialects/sqlite"
)
@ -27,13 +27,13 @@ func GormInit(conf *config.Config) (*gorm.DB, error) {
db.DB().SetMaxIdleConns(10)
db.DB().SetMaxOpenConns(100)
// TODO: Enable Gorm initialization for non-development builds
if config.Environment == "DEVELOPMENT" {
db.LogMode(true)
db.AutoMigrate(&model.User{}, &model.UserFollows{}, &model.UserUploadsOld{})
db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{})
db.AutoMigrate(&model.Comment{}, &model.OldComment{})
}
db.AutoMigrate(&model.User{}, &model.UserFollows{}, &model.UserUploadsOld{})
db.AutoMigrate(&model.Torrent{}, &model.TorrentReport{})
db.AutoMigrate(&model.Comment{}, &model.OldComment{})
return db, nil
}

47
main.go
Voir le fichier

@ -3,6 +3,7 @@ package main
import (
"bufio"
"flag"
"net/http"
"os"
"path/filepath"
@ -13,6 +14,7 @@ import (
"github.com/ewhal/nyaa/db"
"github.com/ewhal/nyaa/network"
"github.com/ewhal/nyaa/router"
"github.com/ewhal/nyaa/service/scraper"
"github.com/ewhal/nyaa/util/log"
"github.com/ewhal/nyaa/util/signals"
"github.com/nicksnyder/go-i18n/i18n"
@ -29,6 +31,7 @@ func initI18N() {
}
}
// RunServer runs webapp mainloop
func RunServer(conf *config.Config) {
http.Handle("/", router.Router)
@ -54,10 +57,46 @@ func RunServer(conf *config.Config) {
}
}
// RunScraper runs tracker scraper mainloop
func RunScraper(conf *config.Config) {
// bind to network
pc, err := network.CreateScraperSocket(conf)
if err != nil {
log.Fatalf("failed to bind udp socket for scraper: %s", err)
}
// configure tracker scraperv
var scraper *scraperService.Scraper
scraper, err = scraperService.New(&conf.Scrape)
if err != nil {
pc.Close()
log.Fatalf("failed to configure scraper: %s", err)
}
workers := conf.Scrape.NumWorkers
if workers < 1 {
workers = 1
}
// register udp socket with signals
signals.RegisterCloser(pc)
// register scraper with signals
signals.RegisterCloser(scraper)
// run udp scraper worker
for workers > 0 {
go scraper.RunWorker(pc)
workers--
}
// run scraper
go scraper.Run()
scraper.Wait()
}
func main() {
conf := config.New()
processFlags := conf.BindFlags()
defaults := flag.Bool("print-defaults", false, "print the default configuration file on stdout")
mode := flag.String("mode", "webapp", "which mode to run daemon in, either webapp or scraper")
flag.Float64Var(&cache.Size, "c", cache.Size, "size of the search cache in MB")
flag.Parse()
if *defaults {
@ -88,6 +127,12 @@ func main() {
log.Fatal(err.Error())
}
}
RunServer(conf)
if *mode == "scraper" {
RunScraper(conf)
} else if *mode == "webapp" {
RunServer(conf)
} else {
log.Fatalf("invalid runtime mode: %s", *mode)
}
}
}

Voir le fichier

@ -39,6 +39,11 @@ type Torrent struct {
OldUploader string `gorm:"-"` // ???????
OldComments []OldComment `gorm:"ForeignKey:torrent_id"`
Comments []Comment `gorm:"ForeignKey:torrent_id"`
Seeders uint32 `gorm:"column:seeders"`
Leechers uint32 `gorm:"column:leechers"`
Completed uint32 `gorm:"column:completed"`
LastScrape time.Time `gorm:"column:last_scrape"`
}
// Returns the total size of memory recursively allocated for this struct
@ -101,6 +106,9 @@ type TorrentJSON struct {
WebsiteLink template.URL `json:"website_link"`
Magnet template.URL `json:"magnet"`
TorrentLink template.URL `json:"torrent"`
Seeders uint32 `json:"seeders"`
Leechers uint32 `json:"leechers"`
LastScrape time.Time `json:"last_scrape"`
}
// ToJSON converts a model.Torrent to its equivalent JSON structure
@ -140,7 +148,11 @@ func (t *Torrent) ToJSON() TorrentJSON {
OldUploader: util.SafeText(t.OldUploader),
WebsiteLink: util.Safe(t.WebsiteLink),
Magnet: template.URL(magnet),
TorrentLink: util.Safe(torrentlink)}
TorrentLink: util.Safe(torrentlink),
Leechers: t.Leechers,
Seeders: t.Seeders,
LastScrape: t.LastScrape,
}
return res
}

Voir le fichier

@ -25,3 +25,17 @@ func CreateHTTPListener(conf *config.Config) (l net.Listener, err error) {
}
return
}
// CreateScraperSocket creates a UDP Scraper socket
func CreateScraperSocket(conf *config.Config) (pc net.PacketConn, err error) {
if conf.I2P == nil {
var laddr *net.UDPAddr
laddr, err = net.ResolveUDPAddr("udp", conf.Scrape.Addr)
if err == nil {
pc, err = net.ListenUDP("udp", laddr)
}
} else {
log.Fatal("i2p udp scraper not supported")
}
return
}

56
service/scraper/bucket.go Fichier normal
Voir le fichier

@ -0,0 +1,56 @@
package scraperService
import (
"math/rand"
"net"
"sync"
"github.com/ewhal/nyaa/model"
)
const InitialConnectionID = 0x41727101980
type Bucket struct {
Addr net.Addr
access sync.Mutex
transactions map[uint32]*Transaction
}
func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
id := rand.Uint32()
// get good id
b.access.Lock()
_, ok := b.transactions[id]
for ok {
id = rand.Uint32()
_, ok = b.transactions[id]
}
t = &Transaction{
TransactionID: id,
bucket: b,
swarms: swarms,
state: stateSendID,
}
b.transactions[id] = t
b.access.Unlock()
return
}
func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) {
b.access.Lock()
t, ok := b.transactions[tid]
b.access.Unlock()
if ok {
v(t)
} else {
v(nil)
}
}
func NewBucket(a net.Addr) *Bucket {
return &Bucket{
transactions: make(map[uint32]*Transaction),
Addr: a,
}
}

7
service/scraper/errors.go Fichier normal
Voir le fichier

@ -0,0 +1,7 @@
package scraperService
import (
"errors"
)
var ErrShortPacket = errors.New("short udp packet")

36
service/scraper/event.go Fichier normal
Voir le fichier

@ -0,0 +1,36 @@
package scraperService
import (
"encoding/binary"
"net"
)
type RecvEvent struct {
From net.Addr
Data []byte
}
// TID extract transaction id
func (ev *RecvEvent) TID() (id uint32, err error) {
if len(ev.Data) < 8 {
err = ErrShortPacket
} else {
id = binary.BigEndian.Uint32(ev.Data[4:])
}
return
}
// Action extract action
func (ev *RecvEvent) Action() (action uint32, err error) {
if len(ev.Data) < 4 {
err = ErrShortPacket
} else {
action = binary.BigEndian.Uint32(ev.Data)
}
return
}
type SendEvent struct {
To net.Addr
Data []byte
}

179
service/scraper/scraper.go Fichier normal
Voir le fichier

@ -0,0 +1,179 @@
package scraperService
import (
"github.com/ewhal/nyaa/config"
"github.com/ewhal/nyaa/db"
"github.com/ewhal/nyaa/model"
"github.com/ewhal/nyaa/util/log"
"net"
"net/url"
"time"
)
// MTU yes this is the ipv6 mtu
const MTU = 1500
// bittorrent scraper
type Scraper struct {
done chan int
sendQueue chan *SendEvent
recvQueue chan *RecvEvent
errQueue chan error
trackers map[string]*Bucket
ticker *time.Ticker
interval time.Duration
}
func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
sc = &Scraper{
done: make(chan int),
sendQueue: make(chan *SendEvent, 1024),
recvQueue: make(chan *RecvEvent, 1024),
errQueue: make(chan error),
trackers: make(map[string]*Bucket),
ticker: time.NewTicker(time.Second),
interval: time.Second * time.Duration(conf.IntervalSeconds),
}
for idx := range conf.Trackers {
err = sc.AddTracker(&conf.Trackers[idx])
if err != nil {
break
}
}
return
}
func (sc *Scraper) AddTracker(conf *config.ScrapeConfig) (err error) {
var u *url.URL
u, err = url.Parse(conf.URL)
if err == nil {
var ips []net.IP
ips, err = net.LookupIP(u.Hostname())
if err == nil {
// TODO: use more than 1 ip ?
addr := &net.UDPAddr{
IP: ips[0],
}
addr.Port, err = net.LookupPort("udp", u.Port())
if err == nil {
sc.trackers[addr.String()] = NewBucket(addr)
}
}
}
return
}
func (sc *Scraper) Close() (err error) {
close(sc.sendQueue)
close(sc.recvQueue)
close(sc.errQueue)
sc.ticker.Stop()
sc.done <- 1
return
}
func (sc *Scraper) runRecv(pc net.PacketConn) {
for {
var buff [MTU]byte
n, from, err := pc.ReadFrom(buff[:])
if err == nil {
log.Debugf("got %d from %s", n, from)
sc.recvQueue <- &RecvEvent{
From: from,
Data: buff[:n],
}
} else {
sc.errQueue <- err
}
}
}
func (sc *Scraper) runSend(pc net.PacketConn) {
for {
ev, ok := <-sc.sendQueue
if !ok {
return
}
log.Debugf("write %d to %s", len(ev.Data), ev.To)
pc.WriteTo(ev.Data, ev.To)
}
}
func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) {
go sc.runRecv(pc)
go sc.runSend(pc)
for {
var bucket *Bucket
ev, ok := <-sc.recvQueue
if !ok {
break
}
tid, err := ev.TID()
action, err := ev.Action()
log.Debugf("transaction = %d action = %d", tid, action)
if err == nil {
bucket, ok = sc.trackers[ev.From.String()]
if ok && bucket != nil {
bucket.VisitTransaction(tid, func(t *Transaction) {
if t == nil {
log.Warnf("no transaction %d", tid)
} else {
if t.GotData(ev.Data) {
err := t.Sync()
if err != nil {
log.Warnf("failed to sync swarm: %s", err)
}
t.Done()
log.Debugf("transaction %d done", tid)
} else {
sc.sendQueue <- t.SendEvent(ev.From)
}
}
})
} else {
log.Warnf("bucket not found for %s", ev.From)
}
}
}
return
}
func (sc *Scraper) Run() {
for {
<-sc.ticker.C
sc.Scrape()
}
}
func (sc *Scraper) Scrape() {
now := time.Now().Add(0 - sc.interval)
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT 700", now).Rows()
if err == nil {
counter := 0
var scrape [70]model.Torrent
for rows.Next() {
idx := counter % 70
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
counter++
if idx == 0 {
for _, b := range sc.trackers {
t := b.NewTransaction(scrape[:])
sc.sendQueue <- t.SendEvent(b.Addr)
}
}
}
rows.Close()
} else {
log.Warnf("failed to select torrents for scrape: %s", err)
}
}
func (sc *Scraper) Wait() {
<-sc.done
}

134
service/scraper/transaction.go Fichier normal
Voir le fichier

@ -0,0 +1,134 @@
package scraperService
import (
"encoding/binary"
"encoding/hex"
"net"
"time"
"github.com/ewhal/nyaa/db"
"github.com/ewhal/nyaa/model"
"github.com/ewhal/nyaa/util/log"
)
const stateSendID = 0
const stateRecvID = 1
const stateTransact = 2
const actionError = 3
const actionScrape = 2
const actionAnnounce = 1
const actionConnect = 0
// Transaction a scrape transaction on a udp tracker
type Transaction struct {
TransactionID uint32
ConnectionID uint64
bucket *Bucket
state uint8
swarms []model.Torrent
}
// Done marks this transaction as done and removes it from parent
func (t *Transaction) Done() {
t.bucket.access.Lock()
delete(t.bucket.transactions, t.TransactionID)
t.bucket.access.Unlock()
}
func (t *Transaction) handleScrapeReply(data []byte) {
data = data[8:]
now := time.Now()
for idx := range t.swarms {
t.swarms[idx].Seeders = binary.BigEndian.Uint32(data)
data = data[4:]
t.swarms[idx].Completed = binary.BigEndian.Uint32(data)
data = data[4:]
t.swarms[idx].Leechers = binary.BigEndian.Uint32(data)
data = data[4:]
t.swarms[idx].LastScrape = now
idx++
}
}
// Sync syncs models with database
func (t *Transaction) Sync() (err error) {
for idx := range t.swarms {
err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{
"seeders": t.swarms[idx].Seeders,
"leechers": t.swarms[idx].Leechers,
"completed": t.swarms[idx].Completed,
"last_scrape": t.swarms[idx].LastScrape,
}).Error
if err != nil {
break
}
}
return
}
// create send event
func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) {
ev = &SendEvent{
To: to,
}
if t.state == stateRecvID {
l := len(t.swarms) * 20
l += 16
ev.Data = make([]byte, l)
binary.BigEndian.PutUint64(ev.Data[:], t.ConnectionID)
binary.BigEndian.PutUint32(ev.Data[8:], 2)
binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID)
for idx := range t.swarms {
ih, err := hex.DecodeString(t.swarms[idx].Hash)
if err == nil && len(ih) == 20 {
copy(ev.Data[16+(idx*20):], ih)
}
}
t.state = stateTransact
} else if t.state == stateSendID {
ev.Data = make([]byte, 16)
binary.BigEndian.PutUint64(ev.Data, InitialConnectionID)
binary.BigEndian.PutUint32(ev.Data[8:], 0)
binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID)
t.state = stateRecvID
}
return
}
func (t *Transaction) handleError(msg string) {
log.Infof("scrape failed: %s", msg)
}
// handle data for transaction
func (t *Transaction) GotData(data []byte) (done bool) {
if len(data) > 4 {
cmd := binary.BigEndian.Uint32(data)
switch cmd {
case actionConnect:
if len(data) == 16 {
if t.state == stateRecvID {
t.ConnectionID = binary.BigEndian.Uint64(data[8:])
}
}
break
case actionScrape:
if len(data) == (12*len(t.swarms))+8 && t.state == stateTransact {
t.handleScrapeReply(data)
}
done = true
break
case actionError:
if len(data) == 12 {
t.handleError(string(data[4:12]))
}
default:
done = true
}
}
return
}

Voir le fichier

@ -30,11 +30,11 @@
<a style="margin: 5px;" aria-label="Report button" type="button" data-toggle="modal" data-target="#reportModal" class="btn btn-danger btn-lg">
<span class="glyphicon glyphicon-remove" aria-hidden="true"></span> Report!
</a>
{{ if HasAdmin $.User}}
<a href="{{ genRoute "mod_tdelete" }}?id={{ .ID }}" class="btn btn-danger btn-lg" onclick="if (!confirm('Are you sure?')) return false;"><i class="glyphicon glyphicon-trash"></i></a>
{{end}}
</div>
</div>