Use sync.WaitGroup for FetchOperations
Cette révision appartient à :
Parent
a068984af3
révision
fb677d4d9d
3 fichiers modifiés avec 25 ajouts et 6 suppressions
5
main.go
5
main.go
|
@ -104,14 +104,15 @@ func RunFilesizeFetcher(conf *config.Config) {
|
|||
}
|
||||
|
||||
signals.RegisterCloser(fetcher)
|
||||
fetcher.Run()
|
||||
fetcher.RunAsync()
|
||||
fetcher.Wait()
|
||||
}
|
||||
|
||||
func main() {
|
||||
conf := config.New()
|
||||
processFlags := conf.BindFlags()
|
||||
defaults := flag.Bool("print-defaults", false, "print the default configuration file on stdout")
|
||||
mode := flag.String("mode", "webapp", "which mode to run daemon in, either webapp or scraper")
|
||||
mode := flag.String("mode", "webapp", "which mode to run daemon in, either webapp, scraper or filesize_fetcher")
|
||||
flag.Float64Var(&conf.Cache.Size, "c", config.DefaultCacheSize, "size of the search cache in MB")
|
||||
|
||||
flag.Parse()
|
||||
|
|
|
@ -22,6 +22,7 @@ type FilesizeFetcher struct {
|
|||
queueMutex sync.Mutex
|
||||
failedOperations map[uint]struct{}
|
||||
wakeUp *time.Ticker
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(fetcherConfig *config.FilesizeFetcherConfig) (fetcher *FilesizeFetcher, err error) {
|
||||
|
@ -133,6 +134,7 @@ func (fetcher *FilesizeFetcher) fillQueue() {
|
|||
operation := NewFetchOperation(fetcher, T)
|
||||
|
||||
if fetcher.addToQueue(operation) {
|
||||
fetcher.wg.Add(1)
|
||||
go operation.Start(fetcher.results)
|
||||
} else {
|
||||
break
|
||||
|
@ -140,8 +142,11 @@ func (fetcher *FilesizeFetcher) fillQueue() {
|
|||
}
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) Run() {
|
||||
func (fetcher *FilesizeFetcher) run() {
|
||||
var result Result
|
||||
|
||||
defer fetcher.wg.Done()
|
||||
|
||||
done := 0
|
||||
fetcher.fillQueue()
|
||||
for done == 0 {
|
||||
|
@ -159,6 +164,12 @@ func (fetcher *FilesizeFetcher) Run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) RunAsync() {
|
||||
fetcher.wg.Add(1)
|
||||
|
||||
go fetcher.run()
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) Close() error {
|
||||
fetcher.queueMutex.Lock()
|
||||
defer fetcher.queueMutex.Unlock()
|
||||
|
@ -169,6 +180,11 @@ func (fetcher *FilesizeFetcher) Close() error {
|
|||
}
|
||||
|
||||
fetcher.done <- 1
|
||||
fetcher.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fetcher *FilesizeFetcher) Wait() {
|
||||
fetcher.wg.Wait()
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,8 @@ func NewFetchOperation(fetcher *FilesizeFetcher, dbEntry model.Torrent) (op *Fet
|
|||
|
||||
// Should be started from a goroutine somewhere
|
||||
func (op *FetchOperation) Start(out chan Result) {
|
||||
defer op.fetcher.wg.Done()
|
||||
|
||||
magnet := util.InfoHashToMagnet(strings.TrimSpace(op.torrent.Hash), op.torrent.Name, config.Trackers...)
|
||||
downloadingTorrent, err := op.fetcher.torrentClient.AddMagnet(magnet)
|
||||
if err != nil {
|
||||
|
@ -45,14 +47,14 @@ func (op *FetchOperation) Start(out chan Result) {
|
|||
case <-downloadingTorrent.GotInfo():
|
||||
downloadingTorrent.Drop()
|
||||
out <- Result{op, nil, downloadingTorrent.Info()}
|
||||
return
|
||||
break
|
||||
case <-timeoutTicker.C:
|
||||
downloadingTorrent.Drop()
|
||||
out <- Result{op, errors.New("Timeout"), nil}
|
||||
return
|
||||
break
|
||||
case <-op.done:
|
||||
downloadingTorrent.Drop()
|
||||
return
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Référencer dans un nouveau ticket