Albirew/nyaa-pantsu
Albirew
/
nyaa-pantsu
Archivé
1
0
Bifurcation 0

Elasticsearch integration (WIP) (#730)

* Update mapping to be similar to TorrentJSON

* Implement ES search for TorrentParam

* Add seeders/leechers/completed to es index

* Fix filter, use analyzer

* Use ES for the search route

* Add upload_id filtering with ES

* Create/update ES index on torrent upload/update

* Delete from ES index on Delete

* Use ES everywhere, fallback to postgres query

Use Elasticsearch to search the index whenever a call to searchByQuery
is made. Big cleanup needed, but _it werks_.

* Only fetch ids from ES, nothing else

* Use ColumnUpdate instead of Save

* Add FIXME/info to search

* Template needs []TorrentJSON not []Torrent
Cette révision appartient à :
tomleb 2017-05-25 19:48:14 -04:00 révisé par ewhal
Parent c3211c6a14
révision f22d11b35d
12 fichiers modifiés avec 418 ajouts et 35 suppressions

Voir le fichier

@ -14,6 +14,18 @@ const (
APlus
)
func (st *Status) ToString() string {
switch *st {
case FilterRemakes:
return "1"
case Trusted:
return "2"
case APlus:
return "3"
}
return ""
}
func (st *Status) Parse(s string) {
switch s {
case "1":
@ -71,6 +83,32 @@ func (s *SortMode) Parse(str string) {
}
}
/* INFO Always need to keep in sync with the field that are used in the
* elasticsearch index.
* TODO Verify the field in postgres database
*/
func (s *SortMode) ToESField() string {
switch *s {
case ID:
return "id"
case Name:
return "name"
case Date:
return "date"
case Downloads:
return "downloads"
case Size:
return "filesize"
case Seeders:
return "seeders"
case Leechers:
return "leechers"
case Completed:
return "completed"
}
return "id"
}
type Category struct {
Main, Sub uint8
}
@ -87,7 +125,15 @@ func (c Category) String() (s string) {
}
func (c Category) IsSet() bool {
return c.Main != 0 && c.Sub != 0
return c.IsMainSet() && c.IsSubSet()
}
func (c Category) IsMainSet() bool {
return c.Main != 0
}
func (c Category) IsSubSet() bool {
return c.Sub != 0
}
// Parse sets category by string

Voir le fichier

@ -1,10 +1,24 @@
package common
import (
"context"
"encoding/json"
"github.com/gorilla/mux"
elastic "gopkg.in/olivere/elastic.v5"
"net/http"
"strconv"
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/db"
"github.com/NyaaPantsu/nyaa/model"
"github.com/NyaaPantsu/nyaa/util/log"
)
// TorrentParam defines all parameters that can be provided when searching for a torrent
type TorrentParam struct {
All bool // True means ignore everything but Max and Offset
Full bool // True means load all members
Order bool // True means acsending
Order bool // True means ascending
Status Status
Sort SortMode
Category Category
@ -17,6 +31,166 @@ type TorrentParam struct {
NameLike string // csv
}
// TODO Should probably return an error ?
func (p *TorrentParam) FromRequest(r *http.Request) {
var err error
nameLike := r.URL.Query().Get("q")
if nameLike == "" {
nameLike = "*"
}
page := mux.Vars(r)["page"]
pagenum, err := strconv.ParseUint(page, 10, 32)
if err != nil {
pagenum = 1
}
max, err := strconv.ParseUint(r.URL.Query().Get("max"), 10, 32)
if err != nil {
max = config.TorrentsPerPage
} else if max > config.MaxTorrentsPerPage {
max = config.MaxTorrentsPerPage
}
// FIXME 0 means no userId defined
userId, err := strconv.ParseUint(r.URL.Query().Get("userID"), 10, 32)
if err != nil {
userId = 0
}
var status Status
status.Parse(r.URL.Query().Get("s"))
var category Category
category.Parse(r.URL.Query().Get("c"))
var sortMode SortMode
sortMode.Parse(r.URL.Query().Get("sort"))
ascending := false
if r.URL.Query().Get("order") == "true" {
ascending = true
}
p.NameLike = nameLike
p.Offset = uint32(pagenum)
p.Max = uint32(max)
p.UserID = uint32(userId)
// TODO Use All
p.All = false
// TODO Use Full
p.Full = false
p.Order = ascending
p.Status = status
p.Sort = sortMode
p.Category = category
// FIXME 0 means no TorrentId defined
// Do we even need that ?
p.TorrentID = 0
}
// Builds a query string with for es query string query defined here
// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html
func (p *TorrentParam) ToFilterQuery() string {
// Don't set sub category unless main category is set
query := ""
if p.Category.IsMainSet() {
query += "category:" + strconv.FormatInt(int64(p.Category.Main), 10)
if p.Category.IsSubSet() {
query += " sub_category:" + strconv.FormatInt(int64(p.Category.Sub), 10)
}
}
if p.UserID != 0 {
query += "uploader_id:" + strconv.FormatInt(int64(p.UserID), 10)
}
if p.Status != ShowAll {
query += " status:" + p.Status.ToString()
}
return query
}
/* Uses elasticsearch to find the torrents based on TorrentParam
* We decided to fetch only the ids from ES and then query these ids to the
* database
*/
func (p *TorrentParam) Find(client *elastic.Client) (int64, []model.Torrent, error) {
// TODO Why is it needed, what does it do ?
ctx := context.Background()
query := elastic.NewSimpleQueryStringQuery(p.NameLike).
Field("name").
Analyzer(config.DefaultElasticsearchAnalyzer).
DefaultOperator("AND")
fsc := elastic.NewFetchSourceContext(true).
Include("id")
// TODO Find a better way to keep in sync with mapping in ansible
search := client.Search().
Index(config.DefaultElasticsearchIndex).
Query(query).
Type(config.DefaultElasticsearchType).
From(int((p.Offset-1)*p.Max)).
Size(int(p.Max)).
Sort(p.Sort.ToESField(), p.Order).
Sort("_score", false). // Don't put _score before the field sort, it messes with the sorting
FetchSourceContext(fsc)
filterQueryString := p.ToFilterQuery()
if filterQueryString != "" {
filterQuery := elastic.NewQueryStringQuery(filterQueryString).
DefaultOperator("AND")
search = search.PostFilter(filterQuery)
}
result, err := search.Do(ctx)
if err != nil {
return 0, nil, err
}
log.Infof("Query '%s' took %d milliseconds.", p.NameLike, result.TookInMillis)
log.Infof("Amount of results %d.", result.TotalHits())
/* TODO Cleanup this giant mess
* The raw query is used because we need to preserve the order of the id's
* in the IN clause, so we can't just do
* select * from torrents where torrent_id IN (list_of_ids)
* This query is said to work on postgres 9.4+
*/
{
// Temporary struct to hold the id
// INFO We are not using Hits.Id because the id in the index might not
// correspond to the id in the database later on.
type TId struct {
Id string
}
var tid TId
var torrents []model.Torrent
if len(result.Hits.Hits) > 0 {
torrents = make([]model.Torrent, len(result.Hits.Hits))
hits := result.Hits.Hits
// Building a string of the form {id1,id2,id3}
source, _ := hits[0].Source.MarshalJSON()
json.Unmarshal(source, &tid)
idsToString := "{" + tid.Id
for _, t := range hits[1:] {
source, _ = t.Source.MarshalJSON()
json.Unmarshal(source, &tid)
idsToString += "," + tid.Id
}
idsToString += "}"
db.ORM.Raw("SELECT * FROM " + config.TorrentsTableName +
" JOIN unnest('" + idsToString + "'::int[]) " +
" WITH ORDINALITY t(torrent_id, ord) USING (torrent_id) ORDER BY t.ord").Find(&torrents)
}
return result.TotalHits(), torrents, nil
}
}
func (p *TorrentParam) Clone() TorrentParam {
return TorrentParam{
Order: p.Order,

Voir le fichier

@ -5,4 +5,5 @@ package config
const (
TorrentsPerPage = 50
MaxTorrentsPerPage = 300
)

Voir le fichier

@ -4,3 +4,9 @@ type SearchConfig struct {
}
var DefaultSearchConfig = SearchConfig{}
const (
DefaultElasticsearchAnalyzer = "nyaapantsu_analyzer"
DefaultElasticsearchIndex = "nyaapantsu"
DefaultElasticsearchType = "torrents" // Name of the type in the es mapping
)

Voir le fichier

@ -22,14 +22,33 @@ settings:
mappings:
torrents:
properties:
torrent_id:
type: long
torrent_name:
# TODO Consistent ID's type in TorrentJSON
id:
type: text
fielddata: true # Use to sort by id because it is currently a text field
name:
type: text
analyzer: nyaapantsu_analyzer
fielddata: true # Use to sort by id because it is currently a text field
category:
type: long
type: text
sub_category:
type: long
type: text
status:
type: long
hash:
type: text
date:
type: date
uploader_id:
type: long
downloads:
type: long
seeders:
type: long
leechers:
type: long
completed:
type: long
filesize:
type: long

Voir le fichier

@ -23,24 +23,35 @@ es = Elasticsearch()
pgconn = psycopg2.connect(dbparams)
cur = pgconn.cursor()
cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status
cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status,
torrent_hash, date, uploader, downloads, filesize
FROM torrents
WHERE deleted_at IS NULL""")
fetches = cur.fetchmany(CHUNK_SIZE)
while fetches:
actions = list()
for torrent_id, torrent_name, category, sub_category, status in fetches:
for torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize in fetches:
# TODO Add seeds/leech
# TODO Consistent ID representation on the codebase
doc = {
'torrent_id': torrent_id,
'torrent_name': torrent_name.decode('utf-8'),
'category': category,
'sub_category': sub_category,
'status': status
'id': str(torrent_id),
'name': torrent_name.decode('utf-8'),
'category': str(category),
'sub_category': str(sub_category),
'status': status,
'hash': torrent_hash,
'date': date,
'uploader_id': uploader,
'downloads': downloads,
'filesize': filesize,
'seeders': 0, # TODO Get seeders from database
'leechers': 0, # TODO Get leechers from database
'completed': 0 # TODO Get completed from database
}
action = {
'_index': pantsu_index,
'_type': 'document',
'_type': 'torrents',
'_id': torrent_id,
'_source': doc
}

Voir le fichier

@ -1,16 +1,18 @@
package model
import (
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/util"
"github.com/bradfitz/slice"
"fmt"
"html/template"
"context"
"path/filepath"
"strconv"
"strings"
"time"
elastic "gopkg.in/olivere/elastic.v5"
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/util"
"github.com/bradfitz/slice"
)
const (
@ -115,6 +117,29 @@ func (t *Torrent) IsDeleted() bool {
return t.DeletedAt != nil
}
func (t Torrent) AddToESIndex(client *elastic.Client) error {
ctx := context.Background()
torrentJson := t.ToJSON()
_, err := client.Index().
Index(config.DefaultElasticsearchIndex).
Type(config.DefaultElasticsearchType).
Id(torrentJson.ID).
BodyJson(torrentJson).
Refresh("true").
Do(ctx)
return err
}
func (t Torrent) DeleteFromESIndex(client *elastic.Client) error {
ctx := context.Background()
_, err := client.Delete().
Index(config.DefaultElasticsearchIndex).
Type(config.DefaultElasticsearchType).
Id(strconv.FormatInt(int64(t.ID), 10)).
Do(ctx)
return err
}
/* We need a JSON object instead of a Gorm structure because magnet URLs are
not in the database and have to be generated dynamically */

Voir le fichier

@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"time"
elastic "gopkg.in/olivere/elastic.v5"
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/db"
@ -202,6 +203,18 @@ func APIUploadHandler(w http.ResponseWriter, r *http.Request) {
Uploader: &user,
}
db.ORM.Create(&torrent)
client, err := elastic.NewClient()
if err == nil {
err = torrent.AddToESIndex(client)
if err == nil {
log.Infof("Successfully added torrent to ES index.")
} else {
log.Errorf("Unable to add torrent to ES index: %s", err)
}
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
}
/*if err != nil {
util.SendError(w, err, 500)
return
@ -260,12 +273,7 @@ func APIUpdateHandler(w http.ResponseWriter, r *http.Request) {
}
update.UpdateTorrent(&torrent)
db.ORM.Model(&torrent).UpdateColumn(&torrent)
if err != nil {
util.SendError(w, err, 500)
return
}
fmt.Printf("%+v\n", torrent)
torrentService.UpdateTorrent(torrent)
}
}

Voir le fichier

@ -4,23 +4,26 @@ import (
"html"
"net/http"
"strconv"
"github.com/gorilla/mux"
"github.com/NyaaPantsu/nyaa/model"
"github.com/NyaaPantsu/nyaa/util"
"github.com/NyaaPantsu/nyaa/util/log"
msg "github.com/NyaaPantsu/nyaa/util/messages"
"github.com/NyaaPantsu/nyaa/util/search"
"github.com/gorilla/mux"
)
// SearchHandler : Controller for displaying search result page, accepting common search arguments
func SearchHandler(w http.ResponseWriter, r *http.Request) {
var err error
// TODO Don't create a new client for each request
messages := msg.GetMessages(r)
// TODO Fallback to postgres search if es is down
vars := mux.Vars(r)
page := vars["page"]
messages := msg.GetMessages(r)
// db params url
var err error
pagenum := 1
if page != "" {
pagenum, err = strconv.Atoi(html.EscapeString(page))
@ -40,17 +43,16 @@ func SearchHandler(w http.ResponseWriter, r *http.Request) {
return
}
b := model.TorrentsToJSON(torrents)
common := newCommonVariables(r)
common.Navigation = navigation{nbTorrents, int(searchParam.Max), pagenum, "search_page"}
commonVar := newCommonVariables(r)
commonVar.Navigation = navigation{int(nbTorrents), int(searchParam.Max), int(searchParam.Page), "search_page"}
// Convert back to strings for now.
common.Search = searchForm{
// TODO Deprecate fully SearchParam and only use TorrentParam
commonVar.Search = searchForm{
SearchParam: searchParam,
Category: searchParam.Category.String(),
ShowItemsPerPage: true,
}
htv := modelListVbs{common, b, messages.GetAllErrors(), messages.GetAllInfos()}
htv := modelListVbs{commonVar, model.TorrentsToJSON(torrents), messages.GetAllErrors(), messages.GetAllInfos()}
err = searchTemplate.ExecuteTemplate(w, "index.html", htv)
if err != nil {

Voir le fichier

@ -5,6 +5,7 @@ import (
"net/http"
"strconv"
"time"
elastic "gopkg.in/olivere/elastic.v5"
"github.com/NyaaPantsu/nyaa/config"
"github.com/NyaaPantsu/nyaa/db"
@ -16,6 +17,7 @@ import (
"github.com/NyaaPantsu/nyaa/service/user"
"github.com/NyaaPantsu/nyaa/service/user/permission"
"github.com/NyaaPantsu/nyaa/util/languages"
"github.com/NyaaPantsu/nyaa/util/log"
msg "github.com/NyaaPantsu/nyaa/util/messages"
)
@ -83,8 +85,23 @@ func UploadPostHandler(w http.ResponseWriter, r *http.Request) {
Description: uploadForm.Description,
WebsiteLink: uploadForm.WebsiteLink,
UploaderID: user.ID}
db.ORM.Create(&torrent)
client, err := elastic.NewClient()
if err == nil {
err = torrent.AddToESIndex(client)
if err == nil {
log.Infof("Successfully added torrent to ES index.")
} else {
log.Errorf("Unable to add torrent to ES index: %s", err)
}
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
}
url, err := Router.Get("view_torrent").URL("id", strconv.FormatUint(uint64(torrent.ID), 10))
if user.ID > 0 && config.DefaultUserSettings["new_torrent"] { // If we are a member and notifications for new torrents are enabled

Voir le fichier

@ -2,6 +2,7 @@ package torrentService
import (
"errors"
elastic "gopkg.in/olivere/elastic.v5"
"net/http"
"strconv"
"strings"
@ -11,7 +12,7 @@ import (
"github.com/NyaaPantsu/nyaa/model"
"github.com/NyaaPantsu/nyaa/service"
"github.com/NyaaPantsu/nyaa/util"
// "github.com/NyaaPantsu/nyaa/util/log"
"github.com/NyaaPantsu/nyaa/util/log"
)
/* Function to interact with Models
@ -203,6 +204,19 @@ func DeleteTorrent(id string) (int, error) {
if db.ORM.Delete(&torrent).Error != nil {
return http.StatusInternalServerError, errors.New("Torrent was not deleted.")
}
// TODO Don't create a new client for each request
client, err := elastic.NewClient()
if err == nil {
err = torrent.DeleteFromESIndex(client)
if err == nil {
log.Infof("Successfully deleted torrent to ES index.")
} else {
log.Errorf("Unable to delete torrent to ES index: %s", err)
}
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
}
return http.StatusOK, nil
}
@ -214,6 +228,19 @@ func DefinitelyDeleteTorrent(id string) (int, error) {
if db.ORM.Unscoped().Model(&torrent).Delete(&torrent).Error != nil {
return http.StatusInternalServerError, errors.New("Torrent was not deleted.")
}
// TODO Don't create a new client for each request
client, err := elastic.NewClient()
if err == nil {
err = torrent.DeleteFromESIndex(client)
if err == nil {
log.Infof("Successfully deleted torrent to ES index.")
} else {
log.Errorf("Unable to delete torrent to ES index: %s", err)
}
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
}
return http.StatusOK, nil
}
@ -238,6 +265,19 @@ func UpdateTorrent(torrent model.Torrent) (int, error) {
return http.StatusInternalServerError, errors.New("Torrent was not updated.")
}
// TODO Don't create a new client for each request
client, err := elastic.NewClient()
if err == nil {
err = torrent.AddToESIndex(client)
if err == nil {
log.Infof("Successfully updated torrent to ES index.")
} else {
log.Errorf("Unable to update torrent to ES index: %s", err)
}
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
}
return http.StatusOK, nil
}

Voir le fichier

@ -6,6 +6,7 @@ import (
"strings"
"unicode"
"unicode/utf8"
elastic "gopkg.in/olivere/elastic.v5"
"github.com/NyaaPantsu/nyaa/cache"
"github.com/NyaaPantsu/nyaa/common"
@ -63,8 +64,41 @@ func SearchByQueryDeleted(r *http.Request, pagenum int) (search common.SearchPar
return
}
// TODO Clean this up
// FIXME Some fields are not used by elasticsearch (pagenum, countAll, deleted, withUser)
// pagenum is extracted from request in .FromRequest()
// elasticsearch always provide a count to how many hits
// deleted is unused because es doesn't index deleted torrents
func searchByQuery(r *http.Request, pagenum int, countAll bool, withUser bool, deleted bool) (
search common.SearchParam, tor []model.Torrent, count int, err error,
) {
client, err := elastic.NewClient()
if err == nil {
var torrentParam common.TorrentParam
torrentParam.FromRequest(r)
totalHits, torrents, err := torrentParam.Find(client)
searchParam := common.SearchParam{
Order: torrentParam.Order,
Status: torrentParam.Status,
Sort: torrentParam.Sort,
Category: torrentParam.Category,
Page: int(torrentParam.Offset),
UserID: uint(torrentParam.UserID),
Max: uint(torrentParam.Max),
NotNull: torrentParam.NotNull,
Query: torrentParam.NameLike,
}
// Convert back to non-json torrents
return searchParam, torrents, int(totalHits), err
} else {
log.Errorf("Unable to create elasticsearch client: %s", err)
log.Errorf("Falling back to postgresql query")
return searchByQueryPostgres(r, pagenum, countAll, withUser, deleted)
}
}
func searchByQueryPostgres(r *http.Request, pagenum int, countAll bool, withUser bool, deleted bool) (
search common.SearchParam, tor []model.Torrent, count int, err error,
) {
max, err := strconv.ParseUint(r.URL.Query().Get("max"), 10, 32)
if err != nil {