Albirew/nyaa-pantsu
Archivé
1
0
Bifurcation 0

fix scraper, optimize updates so it doesn't suck massive ass (#367)

* fix scraper, optimize updates so it doesn't suck massive ass

* fucking ass
Cette révision appartient à :
Jeff 2017-05-11 15:06:47 -04:00 révisé par Austin
Parent 01d48cce0e
révision c1392a52d9
4 fichiers modifiés avec 30 ajouts et 14 suppressions

Voir le fichier

@ -11,6 +11,8 @@ import (
var ORM *gorm.DB
var IsSqlite bool
// GormInit init gorm ORM.
func GormInit(conf *config.Config) (*gorm.DB, error) {
db, openErr := gorm.Open(conf.DBType, conf.DBParams)
@ -19,6 +21,8 @@ func GormInit(conf *config.Config) (*gorm.DB, error) {
return nil, openErr
}
IsSqlite = conf.DBType == "sqlite"
connectionErr := db.DB().Ping()
if connectionErr != nil {
log.CheckError(connectionErr)

Voir le fichier

@ -31,7 +31,7 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
swarms: make([]model.Torrent, len(swarms)),
state: stateSendID,
}
copy(t.swarms, swarms)
copy(t.swarms[:], swarms[:])
b.transactions[id] = t
b.access.Unlock()
return

Voir le fichier

@ -36,9 +36,9 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
recvQueue: make(chan *RecvEvent, 1024),
errQueue: make(chan error),
trackers: make(map[string]*Bucket),
ticker: time.NewTicker(time.Second),
ticker: time.NewTicker(time.Second * 10),
interval: time.Second * time.Duration(conf.IntervalSeconds),
cleanup: time.NewTicker(time.Second),
cleanup: time.NewTicker(time.Minute),
}
if sc.PacketsPerSecond == 0 {
@ -181,7 +181,7 @@ func (sc *Scraper) Scrape(packets uint) {
now := time.Now().Add(0 - sc.interval)
// only scrape torretns uploaded within 90 days
oldest := now.Add(0 - (time.Hour * 24 * 90))
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows()
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE ( last_scrape IS NULL OR last_scrape < ? ) AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows()
if err == nil {
counter := 0
var scrape [ScrapesPerPacket]model.Torrent
@ -189,13 +189,21 @@ func (sc *Scraper) Scrape(packets uint) {
idx := counter % ScrapesPerPacket
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
counter++
if idx == 0 {
if counter%ScrapesPerPacket == 0 {
for _, b := range sc.trackers {
t := b.NewTransaction(scrape[:])
sc.sendQueue <- t.SendEvent(b.Addr)
}
}
}
idx := counter % ScrapesPerPacket
if idx > 0 {
for _, b := range sc.trackers {
t := b.NewTransaction(scrape[:idx])
sc.sendQueue <- t.SendEvent(b.Addr)
}
}
log.Infof("scrape %d", counter)
rows.Close()
} else {

Voir le fichier

@ -53,18 +53,22 @@ func (t *Transaction) handleScrapeReply(data []byte) {
}
}
const pgQuery = "UPDATE torrents SET seeders = $1 , leechers = $2 , completed = $3 , last_scrape = $4 WHERE torrent_id = $5"
const sqliteQuery = "UPDATE torrents SET seeders = ? , leechers = ? , completed = ? , last_scrape = ? WHERE torrent_id = ?"
// Sync syncs models with database
func (t *Transaction) Sync() (err error) {
for idx := range t.swarms {
err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{
"seeders": t.swarms[idx].Seeders,
"leechers": t.swarms[idx].Leechers,
"completed": t.swarms[idx].Completed,
"last_scrape": t.swarms[idx].LastScrape,
}).Error
if err != nil {
break
q := pgQuery
if db.IsSqlite {
q = sqliteQuery
}
tx, e := db.ORM.DB().Begin()
err = e
if err == nil {
for idx := range t.swarms {
_, err = tx.Exec(q, t.swarms[idx].Seeders, t.swarms[idx].Leechers, t.swarms[idx].Completed, t.swarms[idx].LastScrape, t.swarms[idx].ID)
}
tx.Commit()
}
return
}