fix scraper, optimize updates so it doesn't suck massive ass (#367)
* fix scraper, optimize updates so it doesn't suck massive ass * fucking ass
Cette révision appartient à :
Parent
aed45bc29a
révision
100ecffda7
|
@ -11,6 +11,8 @@ import (
|
||||||
|
|
||||||
var ORM *gorm.DB
|
var ORM *gorm.DB
|
||||||
|
|
||||||
|
var IsSqlite bool
|
||||||
|
|
||||||
// GormInit init gorm ORM.
|
// GormInit init gorm ORM.
|
||||||
func GormInit(conf *config.Config) (*gorm.DB, error) {
|
func GormInit(conf *config.Config) (*gorm.DB, error) {
|
||||||
db, openErr := gorm.Open(conf.DBType, conf.DBParams)
|
db, openErr := gorm.Open(conf.DBType, conf.DBParams)
|
||||||
|
@ -19,6 +21,8 @@ func GormInit(conf *config.Config) (*gorm.DB, error) {
|
||||||
return nil, openErr
|
return nil, openErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IsSqlite = conf.DBType == "sqlite"
|
||||||
|
|
||||||
connectionErr := db.DB().Ping()
|
connectionErr := db.DB().Ping()
|
||||||
if connectionErr != nil {
|
if connectionErr != nil {
|
||||||
log.CheckError(connectionErr)
|
log.CheckError(connectionErr)
|
||||||
|
|
|
@ -31,7 +31,7 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
|
||||||
swarms: make([]model.Torrent, len(swarms)),
|
swarms: make([]model.Torrent, len(swarms)),
|
||||||
state: stateSendID,
|
state: stateSendID,
|
||||||
}
|
}
|
||||||
copy(t.swarms, swarms)
|
copy(t.swarms[:], swarms[:])
|
||||||
b.transactions[id] = t
|
b.transactions[id] = t
|
||||||
b.access.Unlock()
|
b.access.Unlock()
|
||||||
return
|
return
|
||||||
|
|
|
@ -36,9 +36,9 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
|
||||||
recvQueue: make(chan *RecvEvent, 1024),
|
recvQueue: make(chan *RecvEvent, 1024),
|
||||||
errQueue: make(chan error),
|
errQueue: make(chan error),
|
||||||
trackers: make(map[string]*Bucket),
|
trackers: make(map[string]*Bucket),
|
||||||
ticker: time.NewTicker(time.Second),
|
ticker: time.NewTicker(time.Second * 10),
|
||||||
interval: time.Second * time.Duration(conf.IntervalSeconds),
|
interval: time.Second * time.Duration(conf.IntervalSeconds),
|
||||||
cleanup: time.NewTicker(time.Second),
|
cleanup: time.NewTicker(time.Minute),
|
||||||
}
|
}
|
||||||
|
|
||||||
if sc.PacketsPerSecond == 0 {
|
if sc.PacketsPerSecond == 0 {
|
||||||
|
@ -181,7 +181,7 @@ func (sc *Scraper) Scrape(packets uint) {
|
||||||
now := time.Now().Add(0 - sc.interval)
|
now := time.Now().Add(0 - sc.interval)
|
||||||
// only scrape torretns uploaded within 90 days
|
// only scrape torretns uploaded within 90 days
|
||||||
oldest := now.Add(0 - (time.Hour * 24 * 90))
|
oldest := now.Add(0 - (time.Hour * 24 * 90))
|
||||||
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows()
|
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE ( last_scrape IS NULL OR last_scrape < ? ) AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
counter := 0
|
counter := 0
|
||||||
var scrape [ScrapesPerPacket]model.Torrent
|
var scrape [ScrapesPerPacket]model.Torrent
|
||||||
|
@ -189,13 +189,21 @@ func (sc *Scraper) Scrape(packets uint) {
|
||||||
idx := counter % ScrapesPerPacket
|
idx := counter % ScrapesPerPacket
|
||||||
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
|
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
|
||||||
counter++
|
counter++
|
||||||
if idx == 0 {
|
if counter%ScrapesPerPacket == 0 {
|
||||||
for _, b := range sc.trackers {
|
for _, b := range sc.trackers {
|
||||||
t := b.NewTransaction(scrape[:])
|
t := b.NewTransaction(scrape[:])
|
||||||
sc.sendQueue <- t.SendEvent(b.Addr)
|
sc.sendQueue <- t.SendEvent(b.Addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
idx := counter % ScrapesPerPacket
|
||||||
|
if idx > 0 {
|
||||||
|
for _, b := range sc.trackers {
|
||||||
|
t := b.NewTransaction(scrape[:idx])
|
||||||
|
sc.sendQueue <- t.SendEvent(b.Addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Infof("scrape %d", counter)
|
||||||
rows.Close()
|
rows.Close()
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -53,18 +53,22 @@ func (t *Transaction) handleScrapeReply(data []byte) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const pgQuery = "UPDATE torrents SET seeders = $1 , leechers = $2 , completed = $3 , last_scrape = $4 WHERE torrent_id = $5"
|
||||||
|
const sqliteQuery = "UPDATE torrents SET seeders = ? , leechers = ? , completed = ? , last_scrape = ? WHERE torrent_id = ?"
|
||||||
|
|
||||||
// Sync syncs models with database
|
// Sync syncs models with database
|
||||||
func (t *Transaction) Sync() (err error) {
|
func (t *Transaction) Sync() (err error) {
|
||||||
for idx := range t.swarms {
|
q := pgQuery
|
||||||
err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{
|
if db.IsSqlite {
|
||||||
"seeders": t.swarms[idx].Seeders,
|
q = sqliteQuery
|
||||||
"leechers": t.swarms[idx].Leechers,
|
}
|
||||||
"completed": t.swarms[idx].Completed,
|
tx, e := db.ORM.DB().Begin()
|
||||||
"last_scrape": t.swarms[idx].LastScrape,
|
err = e
|
||||||
}).Error
|
if err == nil {
|
||||||
if err != nil {
|
for idx := range t.swarms {
|
||||||
break
|
_, err = tx.Exec(q, t.swarms[idx].Seeders, t.swarms[idx].Leechers, t.swarms[idx].Completed, t.swarms[idx].LastScrape, t.swarms[idx].ID)
|
||||||
}
|
}
|
||||||
|
tx.Commit()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
Référencer dans un nouveau ticket