make it work
Cette révision appartient à :
Parent
3d8727d1e3
révision
604cf57677
8 fichiers modifiés avec 70 ajouts et 51 suppressions
1
main.go
1
main.go
|
@ -128,7 +128,6 @@ func main() {
|
|||
}
|
||||
}
|
||||
if *mode == "scraper" {
|
||||
log.Init("DEVELOPMENT")
|
||||
RunScraper(conf)
|
||||
} else if *mode == "webapp" {
|
||||
RunServer(conf)
|
||||
|
|
|
@ -20,19 +20,19 @@ type Feed struct {
|
|||
}
|
||||
|
||||
type Torrent struct {
|
||||
ID uint `gorm:"column:torrent_id;primary_key"`
|
||||
Name string `gorm:"column:torrent_name"`
|
||||
Hash string `gorm:"column:torrent_hash"`
|
||||
Category int `gorm:"column:category"`
|
||||
SubCategory int `gorm:"column:sub_category"`
|
||||
Status int `gorm:"column:status"`
|
||||
Date int64 `gorm:"column:date"`
|
||||
UploaderID uint `gorm:"column:uploader"`
|
||||
Downloads int `gorm:"column:downloads"`
|
||||
Stardom int `gorm:"column:stardom"`
|
||||
Filesize int64 `gorm:"column:filesize"`
|
||||
Description string `gorm:"column:description"`
|
||||
WebsiteLink string `gorm:"column:website_link"`
|
||||
ID uint `gorm:"column:torrent_id;primary_key"`
|
||||
Name string `gorm:"column:torrent_name"`
|
||||
Hash string `gorm:"column:torrent_hash"`
|
||||
Category int `gorm:"column:category"`
|
||||
SubCategory int `gorm:"column:sub_category"`
|
||||
Status int `gorm:"column:status"`
|
||||
Date time.Time `gorm:"column:date"`
|
||||
UploaderID uint `gorm:"column:uploader"`
|
||||
Downloads int `gorm:"column:downloads"`
|
||||
Stardom int `gorm:"column:stardom"`
|
||||
Filesize int64 `gorm:"column:filesize"`
|
||||
Description string `gorm:"column:description"`
|
||||
WebsiteLink string `gorm:"column:website_link"`
|
||||
DeletedAt *time.Time
|
||||
|
||||
Uploader *User `gorm:"ForeignKey:uploader"`
|
||||
|
@ -40,10 +40,10 @@ type Torrent struct {
|
|||
OldComments []OldComment `gorm:"ForeignKey:torrent_id"`
|
||||
Comments []Comment `gorm:"ForeignKey:torrent_id"`
|
||||
|
||||
Seeders uint32 `gorm:"column:seeders"`
|
||||
Leechers uint32 `gorm:"column:leechers"`
|
||||
Completed uint32 `gorm:"column:completed"`
|
||||
LastScrape int64 `gorm:"column:last_scrape"`
|
||||
Seeders uint32 `gorm:"column:seeders"`
|
||||
Leechers uint32 `gorm:"column:leechers"`
|
||||
Completed uint32 `gorm:"column:completed"`
|
||||
LastScrape time.Time `gorm:"column:last_scrape"`
|
||||
}
|
||||
|
||||
// Returns the total size of memory recursively allocated for this struct
|
||||
|
@ -162,7 +162,7 @@ func (t *Torrent) ToJSON() TorrentJSON {
|
|||
Name: t.Name,
|
||||
Status: t.Status,
|
||||
Hash: t.Hash,
|
||||
Date: time.Unix(t.Date, 0).Format(time.RFC3339),
|
||||
Date: t.Date.Format(time.RFC3339),
|
||||
Filesize: util.FormatFilesize2(t.Filesize),
|
||||
Description: util.MarkdownToHTML(t.Description),
|
||||
Comments: commentsJSON,
|
||||
|
@ -177,7 +177,7 @@ func (t *Torrent) ToJSON() TorrentJSON {
|
|||
TorrentLink: util.Safe(torrentlink),
|
||||
Leechers: t.Leechers,
|
||||
Seeders: t.Seeders,
|
||||
LastScrape: time.Unix(t.LastScrape, 0),
|
||||
LastScrape: t.LastScrape,
|
||||
}
|
||||
|
||||
return res
|
||||
|
|
|
@ -132,7 +132,7 @@ func ApiUploadHandler(w http.ResponseWriter, r *http.Request) {
|
|||
SubCategory: upload.SubCategory,
|
||||
Status: 1,
|
||||
Hash: upload.Hash,
|
||||
Date: time.Now().Unix(),
|
||||
Date: time.Now(),
|
||||
Filesize: 0, //?
|
||||
Description: upload.Description,
|
||||
UploaderID: user.ID,
|
||||
|
|
|
@ -19,7 +19,7 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) {
|
|||
createdAsTime := time.Now()
|
||||
|
||||
if len(torrents) > 0 {
|
||||
createdAsTime = time.Unix(torrents[0].Date, 0)
|
||||
createdAsTime = torrents[0].Date
|
||||
}
|
||||
feed := &feeds.Feed{
|
||||
Title: "Nyaa Pantsu",
|
||||
|
@ -37,8 +37,8 @@ func RSSHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Title: torrents[i].Name,
|
||||
Link: &feeds.Link{Href: string(torrentJSON.Magnet)},
|
||||
Description: "",
|
||||
Created: time.Unix(torrents[0].Date, 0),
|
||||
Updated: time.Unix(torrents[0].Date, 0),
|
||||
Created: torrents[0].Date,
|
||||
Updated: torrents[0].Date,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ func UploadHandler(w http.ResponseWriter, r *http.Request) {
|
|||
SubCategory: uploadForm.SubCategoryID,
|
||||
Status: 1,
|
||||
Hash: uploadForm.Infohash,
|
||||
Date: time.Now().Unix(),
|
||||
Date: time.Now(),
|
||||
Filesize: uploadForm.Filesize,
|
||||
Description: uploadForm.Description,
|
||||
UploaderID: user.ID}
|
||||
|
|
|
@ -3,6 +3,7 @@ package scraperService
|
|||
import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/ewhal/nyaa/model"
|
||||
)
|
||||
|
@ -11,12 +12,14 @@ const InitialConnectionID = 0x41727101980
|
|||
|
||||
type Bucket struct {
|
||||
Addr net.Addr
|
||||
access sync.Mutex
|
||||
transactions map[uint32]*Transaction
|
||||
}
|
||||
|
||||
func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
|
||||
id := rand.Uint32()
|
||||
// get good id
|
||||
b.access.Lock()
|
||||
_, ok := b.transactions[id]
|
||||
for ok {
|
||||
id = rand.Uint32()
|
||||
|
@ -24,18 +27,22 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
|
|||
}
|
||||
t = &Transaction{
|
||||
TransactionID: id,
|
||||
bucket: b,
|
||||
swarms: swarms,
|
||||
state: stateSendID,
|
||||
}
|
||||
b.transactions[id] = t
|
||||
b.access.Unlock()
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) {
|
||||
b.access.Lock()
|
||||
t, ok := b.transactions[tid]
|
||||
b.access.Unlock()
|
||||
if ok {
|
||||
go v(t)
|
||||
v(t)
|
||||
} else {
|
||||
v(nil)
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
// MTU yes this is the ipv6 mtu
|
||||
const MTU = 1488
|
||||
const MTU = 1500
|
||||
|
||||
// bittorrent scraper
|
||||
type Scraper struct {
|
||||
|
@ -31,7 +31,7 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
|
|||
recvQueue: make(chan *RecvEvent, 1028),
|
||||
errQueue: make(chan error),
|
||||
trackers: make(map[string]*Bucket),
|
||||
ticker: time.NewTicker(time.Minute),
|
||||
ticker: time.NewTicker(time.Second),
|
||||
interval: time.Second * time.Duration(conf.IntervalSeconds),
|
||||
}
|
||||
for idx := range conf.Trackers {
|
||||
|
@ -127,6 +127,7 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) {
|
|||
log.Warnf("failed to sync swarm: %s", err)
|
||||
}
|
||||
t.Done()
|
||||
log.Debugf("transaction %d done", tid)
|
||||
} else {
|
||||
sc.sendQueue <- t.SendEvent(ev.From)
|
||||
}
|
||||
|
@ -142,7 +143,6 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) {
|
|||
}
|
||||
|
||||
func (sc *Scraper) Run() {
|
||||
sc.Scrape()
|
||||
for {
|
||||
<-sc.ticker.C
|
||||
sc.Scrape()
|
||||
|
@ -150,29 +150,25 @@ func (sc *Scraper) Run() {
|
|||
}
|
||||
|
||||
func (sc *Scraper) Scrape() {
|
||||
now := time.Now().Add(0 - sc.interval)
|
||||
|
||||
swarms := make([]model.Torrent, 0, 128)
|
||||
now := time.Now().Add(0 - sc.interval).Unix()
|
||||
err := db.ORM.Where("last_scrape < ?", now).Or("last_scrape IS NULL").Find(&swarms).Error
|
||||
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT 700", now).Rows()
|
||||
if err == nil {
|
||||
for swarms != nil {
|
||||
var scrape []model.Torrent
|
||||
if len(swarms) > 74 {
|
||||
scrape = swarms[:74]
|
||||
swarms = swarms[74:]
|
||||
} else {
|
||||
scrape = swarms
|
||||
swarms = nil
|
||||
}
|
||||
log.Infof("scraping %d", len(scrape))
|
||||
if len(scrape) > 0 {
|
||||
counter := 0
|
||||
var scrape [70]model.Torrent
|
||||
for rows.Next() {
|
||||
idx := counter % 70
|
||||
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
|
||||
counter++
|
||||
if idx == 0 {
|
||||
for _, b := range sc.trackers {
|
||||
t := b.NewTransaction(scrape)
|
||||
log.Debugf("new transaction %d", t.TransactionID)
|
||||
t := b.NewTransaction(scrape[:])
|
||||
sc.sendQueue <- t.SendEvent(b.Addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
} else {
|
||||
log.Warnf("failed to select torrents for scrape: %s", err)
|
||||
}
|
||||
|
|
|
@ -31,23 +31,39 @@ type Transaction struct {
|
|||
|
||||
// Done marks this transaction as done and removes it from parent
|
||||
func (t *Transaction) Done() {
|
||||
t.bucket.access.Lock()
|
||||
delete(t.bucket.transactions, t.TransactionID)
|
||||
t.bucket.access.Unlock()
|
||||
}
|
||||
|
||||
func (t *Transaction) handleScrapeReply(data []byte) {
|
||||
data = data[8:]
|
||||
now := time.Now().Unix()
|
||||
now := time.Now()
|
||||
for idx := range t.swarms {
|
||||
t.swarms[idx].Seeders = binary.BigEndian.Uint32(data[:idx*12])
|
||||
t.swarms[idx].Completed = binary.BigEndian.Uint32(data[:(idx*12)+4])
|
||||
t.swarms[idx].Leechers = binary.BigEndian.Uint32(data[:(idx*12)+8])
|
||||
t.swarms[idx].Seeders = binary.BigEndian.Uint32(data)
|
||||
data = data[4:]
|
||||
t.swarms[idx].Completed = binary.BigEndian.Uint32(data)
|
||||
data = data[4:]
|
||||
t.swarms[idx].Leechers = binary.BigEndian.Uint32(data)
|
||||
data = data[4:]
|
||||
t.swarms[idx].LastScrape = now
|
||||
idx++
|
||||
}
|
||||
}
|
||||
|
||||
// Sync syncs models with database
|
||||
func (t *Transaction) Sync() (err error) {
|
||||
err = db.ORM.Update(t.swarms).Error
|
||||
for idx := range t.swarms {
|
||||
err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{
|
||||
"seeders": t.swarms[idx].Seeders,
|
||||
"leechers": t.swarms[idx].Leechers,
|
||||
"completed": t.swarms[idx].Completed,
|
||||
"last_scrape": t.swarms[idx].LastScrape,
|
||||
}).Error
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -71,6 +87,7 @@ func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) {
|
|||
copy(ev.Data[16+(idx*20):], ih)
|
||||
}
|
||||
}
|
||||
t.state = stateTransact
|
||||
} else if t.state == stateSendID {
|
||||
ev.Data = make([]byte, 16)
|
||||
binary.BigEndian.PutUint64(ev.Data, InitialConnectionID)
|
||||
|
@ -100,7 +117,7 @@ func (t *Transaction) GotData(data []byte) (done bool) {
|
|||
break
|
||||
case actionScrape:
|
||||
if len(data) == (12*len(t.swarms))+8 && t.state == stateTransact {
|
||||
t.handleScrapeReply(data[8:])
|
||||
t.handleScrapeReply(data)
|
||||
}
|
||||
done = true
|
||||
break
|
||||
|
|
Référencer dans un nouveau ticket