diff --git a/db/gorm.go b/db/gorm.go index 30aa729e..77fa33a6 100644 --- a/db/gorm.go +++ b/db/gorm.go @@ -11,6 +11,8 @@ import ( var ORM *gorm.DB +var IsSqlite bool + // GormInit init gorm ORM. func GormInit(conf *config.Config) (*gorm.DB, error) { db, openErr := gorm.Open(conf.DBType, conf.DBParams) @@ -19,6 +21,8 @@ func GormInit(conf *config.Config) (*gorm.DB, error) { return nil, openErr } + IsSqlite = conf.DBType == "sqlite" + connectionErr := db.DB().Ping() if connectionErr != nil { log.CheckError(connectionErr) diff --git a/service/scraper/bucket.go b/service/scraper/bucket.go index d9c85372..bf8c34f9 100644 --- a/service/scraper/bucket.go +++ b/service/scraper/bucket.go @@ -31,7 +31,7 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) { swarms: make([]model.Torrent, len(swarms)), state: stateSendID, } - copy(t.swarms, swarms) + copy(t.swarms[:], swarms[:]) b.transactions[id] = t b.access.Unlock() return diff --git a/service/scraper/scraper.go b/service/scraper/scraper.go index 61a5825e..28797dd8 100644 --- a/service/scraper/scraper.go +++ b/service/scraper/scraper.go @@ -36,9 +36,9 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) { recvQueue: make(chan *RecvEvent, 1024), errQueue: make(chan error), trackers: make(map[string]*Bucket), - ticker: time.NewTicker(time.Second), + ticker: time.NewTicker(time.Second * 10), interval: time.Second * time.Duration(conf.IntervalSeconds), - cleanup: time.NewTicker(time.Second), + cleanup: time.NewTicker(time.Minute), } if sc.PacketsPerSecond == 0 { @@ -181,7 +181,7 @@ func (sc *Scraper) Scrape(packets uint) { now := time.Now().Add(0 - sc.interval) // only scrape torretns uploaded within 90 days oldest := now.Add(0 - (time.Hour * 24 * 90)) - rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows() + rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE ( last_scrape IS NULL OR last_scrape < ? ) AND date > ? ORDER BY torrent_id DESC LIMIT ?", now, oldest, packets*ScrapesPerPacket).Rows() if err == nil { counter := 0 var scrape [ScrapesPerPacket]model.Torrent @@ -189,13 +189,21 @@ func (sc *Scraper) Scrape(packets uint) { idx := counter % ScrapesPerPacket rows.Scan(&scrape[idx].ID, &scrape[idx].Hash) counter++ - if idx == 0 { + if counter%ScrapesPerPacket == 0 { for _, b := range sc.trackers { t := b.NewTransaction(scrape[:]) sc.sendQueue <- t.SendEvent(b.Addr) } } } + idx := counter % ScrapesPerPacket + if idx > 0 { + for _, b := range sc.trackers { + t := b.NewTransaction(scrape[:idx]) + sc.sendQueue <- t.SendEvent(b.Addr) + } + } + log.Infof("scrape %d", counter) rows.Close() } else { diff --git a/service/scraper/transaction.go b/service/scraper/transaction.go index 7b7de953..aaf6ece3 100644 --- a/service/scraper/transaction.go +++ b/service/scraper/transaction.go @@ -53,18 +53,22 @@ func (t *Transaction) handleScrapeReply(data []byte) { } } +const pgQuery = "UPDATE torrents SET seeders = $1 , leechers = $2 , completed = $3 , last_scrape = $4 WHERE torrent_id = $5" +const sqliteQuery = "UPDATE torrents SET seeders = ? , leechers = ? , completed = ? , last_scrape = ? WHERE torrent_id = ?" + // Sync syncs models with database func (t *Transaction) Sync() (err error) { - for idx := range t.swarms { - err = db.ORM.Model(&t.swarms[idx]).Updates(map[string]interface{}{ - "seeders": t.swarms[idx].Seeders, - "leechers": t.swarms[idx].Leechers, - "completed": t.swarms[idx].Completed, - "last_scrape": t.swarms[idx].LastScrape, - }).Error - if err != nil { - break + q := pgQuery + if db.IsSqlite { + q = sqliteQuery + } + tx, e := db.ORM.DB().Begin() + err = e + if err == nil { + for idx := range t.swarms { + _, err = tx.Exec(q, t.swarms[idx].Seeders, t.swarms[idx].Leechers, t.swarms[idx].Completed, t.swarms[idx].LastScrape, t.swarms[idx].ID) } + tx.Commit() } return }