abstract out cache
Cette révision appartient à :
Parent
a361bcca4f
révision
059ea7d2a8
11 fichiers modifiés avec 237 ajouts et 123 suppressions
139
cache/cache.go
externe
139
cache/cache.go
externe
|
@ -1,132 +1,37 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ewhal/nyaa/cache/memcache"
|
||||
"github.com/ewhal/nyaa/cache/native"
|
||||
"github.com/ewhal/nyaa/cache/nop"
|
||||
"github.com/ewhal/nyaa/common"
|
||||
"github.com/ewhal/nyaa/config"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
|
||||
"errors"
|
||||
)
|
||||
|
||||
const expiryTime = time.Minute
|
||||
|
||||
var (
|
||||
cache = make(map[common.SearchParam]*list.Element, 10)
|
||||
ll = list.New()
|
||||
totalUsed int
|
||||
mu sync.Mutex
|
||||
|
||||
// Size sets the maximum size of the cache before evicting unread data in MB
|
||||
Size float64 = 1 << 10
|
||||
)
|
||||
|
||||
// Key stores the ID of either a thread or board page
|
||||
type Key struct {
|
||||
LastN uint8
|
||||
Board string
|
||||
ID uint64
|
||||
// Cache defines interface for caching search results
|
||||
type Cache interface {
|
||||
Get(key common.SearchParam, r func() ([]model.Torrent, int, error)) ([]model.Torrent, int, error)
|
||||
ClearAll()
|
||||
}
|
||||
|
||||
// Single cache entry
|
||||
type store struct {
|
||||
sync.Mutex // Controls general access to the contents of the struct
|
||||
lastFetched time.Time
|
||||
key common.SearchParam
|
||||
data []model.Torrent
|
||||
count, size int
|
||||
}
|
||||
var ErrInvalidCacheDialect = errors.New("invalid cache dialect")
|
||||
|
||||
// Check the cache for and existing record. If miss, run fn to retrieve fresh
|
||||
// values.
|
||||
func Get(key common.SearchParam, fn func() ([]model.Torrent, int, error)) (
|
||||
data []model.Torrent, count int, err error,
|
||||
) {
|
||||
s := getStore(key)
|
||||
// Impl cache implementation instance
|
||||
var Impl Cache
|
||||
|
||||
// Also keeps multiple requesters from simultaneously requesting the same
|
||||
// data
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if s.isFresh() {
|
||||
return s.data, s.count, nil
|
||||
}
|
||||
|
||||
data, count, err = fn()
|
||||
if err != nil {
|
||||
func Init(conf *config.CacheConfig) (err error) {
|
||||
switch conf.Dialect {
|
||||
case "native":
|
||||
Impl = native.New(conf.Size)
|
||||
return
|
||||
case "memcache":
|
||||
Impl = memcache.New()
|
||||
return
|
||||
default:
|
||||
Impl = nop.New()
|
||||
}
|
||||
s.update(data, count)
|
||||
return
|
||||
}
|
||||
|
||||
// Retrieve a store from the cache or create a new one
|
||||
func getStore(k common.SearchParam) (s *store) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
el := cache[k]
|
||||
if el == nil {
|
||||
s = &store{key: k}
|
||||
cache[k] = ll.PushFront(s)
|
||||
} else {
|
||||
ll.MoveToFront(el)
|
||||
s = el.Value.(*store)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Clear the cache. Only used for testing.
|
||||
func Clear() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
ll = list.New()
|
||||
cache = make(map[common.SearchParam]*list.Element, 10)
|
||||
}
|
||||
|
||||
// Update the total used memory counter and evict, if over limit
|
||||
func updateUsedSize(delta int) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
totalUsed += delta
|
||||
|
||||
for totalUsed > int(Size)<<20 {
|
||||
e := ll.Back()
|
||||
if e == nil {
|
||||
break
|
||||
}
|
||||
s := ll.Remove(e).(*store)
|
||||
delete(cache, s.key)
|
||||
totalUsed -= s.size
|
||||
}
|
||||
}
|
||||
|
||||
// Return, if the data can still be considered fresh, without querying the DB
|
||||
func (s *store) isFresh() bool {
|
||||
if s.lastFetched.IsZero() { // New store
|
||||
return false
|
||||
}
|
||||
return s.lastFetched.Add(expiryTime).After(time.Now())
|
||||
}
|
||||
|
||||
// Stores the new values of s. Calculates and stores the new size. Passes the
|
||||
// delta to the central cache to fire eviction checks.
|
||||
func (s *store) update(data []model.Torrent, count int) {
|
||||
newSize := 0
|
||||
for _, d := range data {
|
||||
newSize += d.Size()
|
||||
}
|
||||
s.data = data
|
||||
s.count = count
|
||||
delta := newSize - s.size
|
||||
s.size = newSize
|
||||
s.lastFetched = time.Now()
|
||||
|
||||
// Technically it is possible to update the size even when the store is
|
||||
// already evicted, but that should never happen, unless you have a very
|
||||
// small cache, very large stored datasets and a lot of traffic.
|
||||
updateUsedSize(delta)
|
||||
}
|
||||
|
|
21
cache/memcache/memcache.go
externe
Fichier normal
21
cache/memcache/memcache.go
externe
Fichier normal
|
@ -0,0 +1,21 @@
|
|||
package memcache
|
||||
|
||||
import (
|
||||
"github.com/ewhal/nyaa/common"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
)
|
||||
|
||||
type Memcache struct {
|
||||
}
|
||||
|
||||
func (c *Memcache) Get(key common.SearchParam, r func() ([]model.Torrent, int, error)) (torrents []model.Torrent, num int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *Memcache) ClearAll() {
|
||||
|
||||
}
|
||||
|
||||
func New() *Memcache {
|
||||
return &Memcache{}
|
||||
}
|
143
cache/native/native.go
externe
Fichier normal
143
cache/native/native.go
externe
Fichier normal
|
@ -0,0 +1,143 @@
|
|||
package native
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ewhal/nyaa/common"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
)
|
||||
|
||||
const expiryTime = time.Minute
|
||||
|
||||
// NativeCache implements cache.Cache
|
||||
type NativeCache struct {
|
||||
cache map[common.SearchParam]*list.Element
|
||||
ll *list.List
|
||||
totalUsed int
|
||||
mu sync.Mutex
|
||||
|
||||
// Size sets the maximum size of the cache before evicting unread data in MB
|
||||
Size float64
|
||||
}
|
||||
|
||||
// New Creates New Native Cache instance
|
||||
func New(sz float64) *NativeCache {
|
||||
return &NativeCache{
|
||||
cache: make(map[common.SearchParam]*list.Element, 10),
|
||||
Size: sz,
|
||||
ll: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Key stores the ID of either a thread or board page
|
||||
type Key struct {
|
||||
LastN uint8
|
||||
Board string
|
||||
ID uint64
|
||||
}
|
||||
|
||||
// Single cache entry
|
||||
type store struct {
|
||||
sync.Mutex // Controls general access to the contents of the struct
|
||||
lastFetched time.Time
|
||||
key common.SearchParam
|
||||
data []model.Torrent
|
||||
count, size int
|
||||
n *NativeCache
|
||||
}
|
||||
|
||||
// Check the cache for and existing record. If miss, run fn to retrieve fresh
|
||||
// values.
|
||||
func (n *NativeCache) Get(key common.SearchParam, fn func() ([]model.Torrent, int, error)) (
|
||||
data []model.Torrent, count int, err error,
|
||||
) {
|
||||
s := n.getStore(key)
|
||||
|
||||
// Also keeps multiple requesters from simultaneously requesting the same
|
||||
// data
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if s.isFresh() {
|
||||
return s.data, s.count, nil
|
||||
}
|
||||
|
||||
data, count, err = fn()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.update(data, count)
|
||||
return
|
||||
}
|
||||
|
||||
// Retrieve a store from the cache or create a new one
|
||||
func (n *NativeCache) getStore(k common.SearchParam) (s *store) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
el := n.cache[k]
|
||||
if el == nil {
|
||||
s = &store{key: k, n: n}
|
||||
n.cache[k] = n.ll.PushFront(s)
|
||||
} else {
|
||||
n.ll.MoveToFront(el)
|
||||
s = el.Value.(*store)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Clear the cache. Only used for testing.
|
||||
func (n *NativeCache) ClearAll() {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
n.ll = list.New()
|
||||
n.cache = make(map[common.SearchParam]*list.Element, 10)
|
||||
}
|
||||
|
||||
// Update the total used memory counter and evict, if over limit
|
||||
func (n *NativeCache) updateUsedSize(delta int) {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
n.totalUsed += delta
|
||||
|
||||
for n.totalUsed > int(n.Size)<<20 {
|
||||
e := n.ll.Back()
|
||||
if e == nil {
|
||||
break
|
||||
}
|
||||
s := n.ll.Remove(e).(*store)
|
||||
delete(n.cache, s.key)
|
||||
n.totalUsed -= s.size
|
||||
}
|
||||
}
|
||||
|
||||
// Return, if the data can still be considered fresh, without querying the DB
|
||||
func (s *store) isFresh() bool {
|
||||
if s.lastFetched.IsZero() { // New store
|
||||
return false
|
||||
}
|
||||
return s.lastFetched.Add(expiryTime).After(time.Now())
|
||||
}
|
||||
|
||||
// Stores the new values of s. Calculates and stores the new size. Passes the
|
||||
// delta to the central cache to fire eviction checks.
|
||||
func (s *store) update(data []model.Torrent, count int) {
|
||||
newSize := 0
|
||||
for _, d := range data {
|
||||
newSize += d.Size()
|
||||
}
|
||||
s.data = data
|
||||
s.count = count
|
||||
delta := newSize - s.size
|
||||
s.size = newSize
|
||||
s.lastFetched = time.Now()
|
||||
|
||||
// Technically it is possible to update the size even when the store is
|
||||
// already evicted, but that should never happen, unless you have a very
|
||||
// small cache, very large stored datasets and a lot of traffic.
|
||||
s.n.updateUsedSize(delta)
|
||||
}
|
22
cache/nop/nop.go
externe
Fichier normal
22
cache/nop/nop.go
externe
Fichier normal
|
@ -0,0 +1,22 @@
|
|||
package nop
|
||||
|
||||
import (
|
||||
"github.com/ewhal/nyaa/common"
|
||||
"github.com/ewhal/nyaa/model"
|
||||
)
|
||||
|
||||
type NopCache struct {
|
||||
}
|
||||
|
||||
func (c *NopCache) Get(key common.SearchParam, fn func() ([]model.Torrent, int, error)) ([]model.Torrent, int, error) {
|
||||
return fn()
|
||||
}
|
||||
|
||||
func (c *NopCache) ClearAll() {
|
||||
|
||||
}
|
||||
|
||||
// New creates a new Cache that does NOTHING :D
|
||||
func New() *NopCache {
|
||||
return &NopCache{}
|
||||
}
|
14
config/cache.go
Fichier normal
14
config/cache.go
Fichier normal
|
@ -0,0 +1,14 @@
|
|||
package config
|
||||
|
||||
// CacheConfig is config struct for caching strategy
|
||||
type CacheConfig struct {
|
||||
Dialect string
|
||||
URL string
|
||||
Size float64
|
||||
}
|
||||
|
||||
const DefaultCacheSize = 1 << 10
|
||||
|
||||
var DefaultCacheConfig = CacheConfig{
|
||||
Dialect: "nop",
|
||||
}
|
|
@ -24,11 +24,13 @@ type Config struct {
|
|||
DBParams string `json:"db_params"`
|
||||
// tracker scraper config (required)
|
||||
Scrape ScraperConfig `json:"scraper"`
|
||||
// cache config
|
||||
Cache CacheConfig `json:"cache"`
|
||||
// optional i2p configuration
|
||||
I2P *I2PConfig `json:"i2p"`
|
||||
}
|
||||
|
||||
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", DefaultScraperConfig, nil}
|
||||
var Defaults = Config{"localhost", 9999, "sqlite3", "./nyaa.db?cache_size=50", DefaultScraperConfig, DefaultCacheConfig, nil}
|
||||
|
||||
var allowedDatabaseTypes = map[string]bool{
|
||||
"sqlite3": true,
|
||||
|
@ -44,6 +46,7 @@ func New() *Config {
|
|||
config.DBType = Defaults.DBType
|
||||
config.DBParams = Defaults.DBParams
|
||||
config.Scrape = Defaults.Scrape
|
||||
config.Cache = Defaults.Cache
|
||||
return &config
|
||||
}
|
||||
|
||||
|
|
6
main.go
6
main.go
|
@ -98,7 +98,7 @@ func main() {
|
|||
processFlags := conf.BindFlags()
|
||||
defaults := flag.Bool("print-defaults", false, "print the default configuration file on stdout")
|
||||
mode := flag.String("mode", "webapp", "which mode to run daemon in, either webapp or scraper")
|
||||
flag.Float64Var(&cache.Size, "c", cache.Size, "size of the search cache in MB")
|
||||
flag.Float64Var(&conf.Cache.Size, "c", config.DefaultCacheSize, "size of the search cache in MB")
|
||||
flag.Parse()
|
||||
if *defaults {
|
||||
stdout := bufio.NewWriter(os.Stdout)
|
||||
|
@ -121,6 +121,10 @@ func main() {
|
|||
log.Fatal(err.Error())
|
||||
}
|
||||
initI18N()
|
||||
err = cache.Init(&conf.Cache)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
go signals.Handle()
|
||||
if len(config.TorrentFileStorage) > 0 {
|
||||
err := os.MkdirAll(config.TorrentFileStorage, 0700)
|
||||
|
|
|
@ -43,7 +43,7 @@ func HomeHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Max: uint(maxPerPage),
|
||||
Page: pagenum,
|
||||
}
|
||||
torrents, nbTorrents, err := cache.Get(search, func() ([]model.Torrent, int, error) {
|
||||
torrents, nbTorrents, err := cache.Impl.Get(search, func() ([]model.Torrent, int, error) {
|
||||
torrents, nbTorrents, err := torrentService.GetAllTorrents(maxPerPage, maxPerPage*(pagenum-1))
|
||||
if !log.CheckError(err) {
|
||||
util.SendError(w, err, 400)
|
||||
|
|
|
@ -93,7 +93,7 @@ func (f *UploadForm) ExtractInfo(r *http.Request) error {
|
|||
f.Name = util.TrimWhitespaces(f.Name)
|
||||
f.Description = p.Sanitize(util.TrimWhitespaces(f.Description))
|
||||
f.Magnet = util.TrimWhitespaces(f.Magnet)
|
||||
cache.Clear()
|
||||
cache.Impl.ClearAll()
|
||||
|
||||
catsSplit := strings.Split(f.Category, "_")
|
||||
// need this to prevent out of index panics
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"github.com/ewhal/nyaa/model"
|
||||
"github.com/ewhal/nyaa/service"
|
||||
"github.com/ewhal/nyaa/util"
|
||||
"github.com/ewhal/nyaa/util/log"
|
||||
)
|
||||
|
||||
/* Function to interact with Models
|
||||
|
@ -140,6 +141,7 @@ func getTorrentsOrderBy(parameters *serviceBase.WhereParams, orderBy string, lim
|
|||
if limit != 0 || offset != 0 { // if limits provided
|
||||
dbQuery = dbQuery + " LIMIT " + strconv.Itoa(limit) + " OFFSET " + strconv.Itoa(offset)
|
||||
}
|
||||
log.Infof("SQL: %s", dbQuery)
|
||||
err = db.ORM.Raw(dbQuery, params...).Find(&torrents).Error
|
||||
return
|
||||
}
|
||||
|
|
|
@ -118,7 +118,7 @@ func searchByQuery(r *http.Request, pagenum int, countAll bool) (
|
|||
orderBy += "desc"
|
||||
}
|
||||
|
||||
tor, count, err = cache.Get(search, func() (tor []model.Torrent, count int, err error) {
|
||||
tor, count, err = cache.Impl.Get(search, func() (tor []model.Torrent, count int, err error) {
|
||||
parameters := serviceBase.WhereParams{
|
||||
Params: make([]interface{}, 0, 64),
|
||||
}
|
||||
|
|
Référencer dans un nouveau ticket