fix torrent swarm ordering and add timeouts for udp scrape
Cette révision appartient à :
Parent
7d98ebd7fe
révision
57895251a3
6 fichiers modifiés avec 104 ajouts et 20 suppressions
|
@ -19,6 +19,9 @@ const (
|
||||||
Date
|
Date
|
||||||
Downloads
|
Downloads
|
||||||
Size
|
Size
|
||||||
|
Seeders
|
||||||
|
Leechers
|
||||||
|
Completed
|
||||||
)
|
)
|
||||||
|
|
||||||
type Category struct {
|
type Category struct {
|
||||||
|
@ -44,5 +47,6 @@ type SearchParam struct {
|
||||||
Page int
|
Page int
|
||||||
UserID uint
|
UserID uint
|
||||||
Max uint
|
Max uint
|
||||||
|
NotNull string
|
||||||
Query string
|
Query string
|
||||||
}
|
}
|
||||||
|
|
1
main.go
1
main.go
|
@ -84,6 +84,7 @@ func RunScraper(conf *config.Config) {
|
||||||
signals.RegisterCloser(scraper)
|
signals.RegisterCloser(scraper)
|
||||||
// run udp scraper worker
|
// run udp scraper worker
|
||||||
for workers > 0 {
|
for workers > 0 {
|
||||||
|
log.Infof("starting up worker %d", workers)
|
||||||
go scraper.RunWorker(pc)
|
go scraper.RunWorker(pc)
|
||||||
workers--
|
workers--
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,15 +28,42 @@ func (b *Bucket) NewTransaction(swarms []model.Torrent) (t *Transaction) {
|
||||||
t = &Transaction{
|
t = &Transaction{
|
||||||
TransactionID: id,
|
TransactionID: id,
|
||||||
bucket: b,
|
bucket: b,
|
||||||
swarms: swarms,
|
swarms: make([]model.Torrent, len(swarms)),
|
||||||
state: stateSendID,
|
state: stateSendID,
|
||||||
}
|
}
|
||||||
|
copy(t.swarms, swarms)
|
||||||
b.transactions[id] = t
|
b.transactions[id] = t
|
||||||
b.access.Unlock()
|
b.access.Unlock()
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Bucket) ForEachTransaction(v func(uint32, *Transaction)) {
|
||||||
|
|
||||||
|
clone := make(map[uint32]*Transaction)
|
||||||
|
|
||||||
|
b.access.Lock()
|
||||||
|
|
||||||
|
for k := range b.transactions {
|
||||||
|
clone[k] = b.transactions[k]
|
||||||
|
}
|
||||||
|
|
||||||
|
b.access.Unlock()
|
||||||
|
|
||||||
|
for k := range clone {
|
||||||
|
v(k, clone[k])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Bucket) Forget(tid uint32) {
|
||||||
|
b.access.Lock()
|
||||||
|
_, ok := b.transactions[tid]
|
||||||
|
if ok {
|
||||||
|
delete(b.transactions, tid)
|
||||||
|
}
|
||||||
|
b.access.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) {
|
func (b *Bucket) VisitTransaction(tid uint32, v func(*Transaction)) {
|
||||||
b.access.Lock()
|
b.access.Lock()
|
||||||
t, ok := b.transactions[tid]
|
t, ok := b.transactions[tid]
|
||||||
|
|
|
@ -13,15 +13,20 @@ import (
|
||||||
// MTU yes this is the ipv6 mtu
|
// MTU yes this is the ipv6 mtu
|
||||||
const MTU = 1500
|
const MTU = 1500
|
||||||
|
|
||||||
|
// max number of scrapes per packet
|
||||||
|
const ScrapesPerPacket = 74
|
||||||
|
|
||||||
// bittorrent scraper
|
// bittorrent scraper
|
||||||
type Scraper struct {
|
type Scraper struct {
|
||||||
done chan int
|
done chan int
|
||||||
sendQueue chan *SendEvent
|
sendQueue chan *SendEvent
|
||||||
recvQueue chan *RecvEvent
|
recvQueue chan *RecvEvent
|
||||||
errQueue chan error
|
errQueue chan error
|
||||||
trackers map[string]*Bucket
|
trackers map[string]*Bucket
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
interval time.Duration
|
cleanup *time.Ticker
|
||||||
|
interval time.Duration
|
||||||
|
PacketsPerSecond uint
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
|
func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
|
||||||
|
@ -33,7 +38,13 @@ func New(conf *config.ScraperConfig) (sc *Scraper, err error) {
|
||||||
trackers: make(map[string]*Bucket),
|
trackers: make(map[string]*Bucket),
|
||||||
ticker: time.NewTicker(time.Second),
|
ticker: time.NewTicker(time.Second),
|
||||||
interval: time.Second * time.Duration(conf.IntervalSeconds),
|
interval: time.Second * time.Duration(conf.IntervalSeconds),
|
||||||
|
cleanup: time.NewTicker(time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sc.PacketsPerSecond == 0 {
|
||||||
|
sc.PacketsPerSecond = 10
|
||||||
|
}
|
||||||
|
|
||||||
for idx := range conf.Trackers {
|
for idx := range conf.Trackers {
|
||||||
err = sc.AddTracker(&conf.Trackers[idx])
|
err = sc.AddTracker(&conf.Trackers[idx])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -144,20 +155,37 @@ func (sc *Scraper) RunWorker(pc net.PacketConn) (err error) {
|
||||||
|
|
||||||
func (sc *Scraper) Run() {
|
func (sc *Scraper) Run() {
|
||||||
for {
|
for {
|
||||||
<-sc.ticker.C
|
select {
|
||||||
sc.Scrape()
|
case <-sc.ticker.C:
|
||||||
|
sc.Scrape(sc.PacketsPerSecond)
|
||||||
|
break
|
||||||
|
case <-sc.cleanup.C:
|
||||||
|
sc.removeStale()
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *Scraper) Scrape() {
|
func (sc *Scraper) removeStale() {
|
||||||
|
|
||||||
|
for k := range sc.trackers {
|
||||||
|
sc.trackers[k].ForEachTransaction(func(tid uint32, t *Transaction) {
|
||||||
|
if t == nil || t.IsTimedOut() {
|
||||||
|
sc.trackers[k].Forget(tid)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *Scraper) Scrape(packets uint) {
|
||||||
now := time.Now().Add(0 - sc.interval)
|
now := time.Now().Add(0 - sc.interval)
|
||||||
|
|
||||||
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT 700", now).Rows()
|
rows, err := db.ORM.Raw("SELECT torrent_id, torrent_hash FROM torrents WHERE last_scrape IS NULL OR last_scrape < ? ORDER BY torrent_id DESC LIMIT ?", now, packets*ScrapesPerPacket).Rows()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
counter := 0
|
counter := 0
|
||||||
var scrape [70]model.Torrent
|
var scrape [ScrapesPerPacket]model.Torrent
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
idx := counter % 70
|
idx := counter % ScrapesPerPacket
|
||||||
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
|
rows.Scan(&scrape[idx].ID, &scrape[idx].Hash)
|
||||||
counter++
|
counter++
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
|
|
|
@ -11,6 +11,9 @@ import (
|
||||||
"github.com/ewhal/nyaa/util/log"
|
"github.com/ewhal/nyaa/util/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TransactionTimeout 30 second timeout for transactions
|
||||||
|
const TransactionTimeout = time.Second * 30
|
||||||
|
|
||||||
const stateSendID = 0
|
const stateSendID = 0
|
||||||
const stateRecvID = 1
|
const stateRecvID = 1
|
||||||
const stateTransact = 2
|
const stateTransact = 2
|
||||||
|
@ -27,13 +30,12 @@ type Transaction struct {
|
||||||
bucket *Bucket
|
bucket *Bucket
|
||||||
state uint8
|
state uint8
|
||||||
swarms []model.Torrent
|
swarms []model.Torrent
|
||||||
|
lastData time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Done marks this transaction as done and removes it from parent
|
// Done marks this transaction as done and removes it from parent
|
||||||
func (t *Transaction) Done() {
|
func (t *Transaction) Done() {
|
||||||
t.bucket.access.Lock()
|
t.bucket.Forget(t.TransactionID)
|
||||||
delete(t.bucket.transactions, t.TransactionID)
|
|
||||||
t.bucket.access.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Transaction) handleScrapeReply(data []byte) {
|
func (t *Transaction) handleScrapeReply(data []byte) {
|
||||||
|
@ -95,6 +97,7 @@ func (t *Transaction) SendEvent(to net.Addr) (ev *SendEvent) {
|
||||||
binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID)
|
binary.BigEndian.PutUint32(ev.Data[12:], t.TransactionID)
|
||||||
t.state = stateRecvID
|
t.state = stateRecvID
|
||||||
}
|
}
|
||||||
|
t.lastData = time.Now()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,7 +107,7 @@ func (t *Transaction) handleError(msg string) {
|
||||||
|
|
||||||
// handle data for transaction
|
// handle data for transaction
|
||||||
func (t *Transaction) GotData(data []byte) (done bool) {
|
func (t *Transaction) GotData(data []byte) (done bool) {
|
||||||
|
t.lastData = time.Now()
|
||||||
if len(data) > 4 {
|
if len(data) > 4 {
|
||||||
cmd := binary.BigEndian.Uint32(data)
|
cmd := binary.BigEndian.Uint32(data)
|
||||||
switch cmd {
|
switch cmd {
|
||||||
|
@ -132,3 +135,8 @@ func (t *Transaction) GotData(data []byte) (done bool) {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Transaction) IsTimedOut() bool {
|
||||||
|
return t.lastData.Add(TransactionTimeout).Before(time.Now())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) (
|
||||||
search.Page = pagenum
|
search.Page = pagenum
|
||||||
search.Query = r.URL.Query().Get("q")
|
search.Query = r.URL.Query().Get("q")
|
||||||
userID, _ := strconv.Atoi(r.URL.Query().Get("userID"))
|
userID, _ := strconv.Atoi(r.URL.Query().Get("userID"))
|
||||||
search.UserID = uint(userID)
|
search.UserID = uint(userID)
|
||||||
|
|
||||||
switch s := r.URL.Query().Get("s"); s {
|
switch s := r.URL.Query().Get("s"); s {
|
||||||
case "1":
|
case "1":
|
||||||
|
@ -75,22 +75,36 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) (
|
||||||
case "1":
|
case "1":
|
||||||
search.Sort = common.Name
|
search.Sort = common.Name
|
||||||
orderBy += "torrent_name"
|
orderBy += "torrent_name"
|
||||||
|
break
|
||||||
case "2":
|
case "2":
|
||||||
search.Sort = common.Date
|
search.Sort = common.Date
|
||||||
orderBy += "date"
|
orderBy += "date"
|
||||||
|
break
|
||||||
case "3":
|
case "3":
|
||||||
search.Sort = common.Downloads
|
search.Sort = common.Downloads
|
||||||
orderBy += "downloads"
|
orderBy += "downloads"
|
||||||
|
break
|
||||||
case "4":
|
case "4":
|
||||||
search.Sort = common.Size
|
search.Sort = common.Size
|
||||||
orderBy += "filesize"
|
orderBy += "filesize"
|
||||||
|
break
|
||||||
case "5":
|
case "5":
|
||||||
|
search.Sort = common.Seeders
|
||||||
orderBy += "seeders"
|
orderBy += "seeders"
|
||||||
|
search.NotNull += "seeders IS NOT NULL "
|
||||||
|
break
|
||||||
case "6":
|
case "6":
|
||||||
|
search.Sort = common.Leechers
|
||||||
orderBy += "leechers"
|
orderBy += "leechers"
|
||||||
|
search.NotNull += "leechers IS NOT NULL "
|
||||||
|
break
|
||||||
case "7":
|
case "7":
|
||||||
|
search.Sort = common.Completed
|
||||||
orderBy += "completed"
|
orderBy += "completed"
|
||||||
|
search.NotNull += "completed IS NOT NULL "
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
|
search.Sort = common.ID
|
||||||
orderBy += "torrent_id"
|
orderBy += "torrent_id"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +143,9 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) (
|
||||||
}
|
}
|
||||||
parameters.Params = append(parameters.Params, strconv.Itoa(int(search.Status)+1))
|
parameters.Params = append(parameters.Params, strconv.Itoa(int(search.Status)+1))
|
||||||
}
|
}
|
||||||
|
if len(search.NotNull) > 0 {
|
||||||
|
conditions = append(conditions, search.NotNull)
|
||||||
|
}
|
||||||
searchQuerySplit := strings.Fields(search.Query)
|
searchQuerySplit := strings.Fields(search.Query)
|
||||||
for i, word := range searchQuerySplit {
|
for i, word := range searchQuerySplit {
|
||||||
firstRune, _ := utf8.DecodeRuneInString(word)
|
firstRune, _ := utf8.DecodeRuneInString(word)
|
||||||
|
|
Référencer dans un nouveau ticket