From f22d11b35d822f5cc4aa3b1f9d63fbb34c17c121 Mon Sep 17 00:00:00 2001 From: tomleb Date: Thu, 25 May 2017 19:48:14 -0400 Subject: [PATCH] Elasticsearch integration (WIP) (#730) * Update mapping to be similar to TorrentJSON * Implement ES search for TorrentParam * Add seeders/leechers/completed to es index * Fix filter, use analyzer * Use ES for the search route * Add upload_id filtering with ES * Create/update ES index on torrent upload/update * Delete from ES index on Delete * Use ES everywhere, fallback to postgres query Use Elasticsearch to search the index whenever a call to searchByQuery is made. Big cleanup needed, but _it werks_. * Only fetch ids from ES, nothing else * Use ColumnUpdate instead of Save * Add FIXME/info to search * Template needs []TorrentJSON not []Torrent --- common/search.go | 48 ++++- common/torrent.go | 176 +++++++++++++++++- config/navigation.go | 1 + config/search.go | 6 + .../files/elasticsearch_settings.yml | 29 ++- .../elasticsearch/files/index_nyaapantsu.py | 27 ++- model/torrent.go | 33 +++- router/api_handler.go | 20 +- router/search_handler.go | 20 +- router/upload_handler.go | 17 ++ service/torrent/torrent.go | 42 ++++- util/search/search.go | 34 ++++ 12 files changed, 418 insertions(+), 35 deletions(-) diff --git a/common/search.go b/common/search.go index 8da753e7..f2399c5f 100644 --- a/common/search.go +++ b/common/search.go @@ -14,6 +14,18 @@ const ( APlus ) +func (st *Status) ToString() string { + switch *st { + case FilterRemakes: + return "1" + case Trusted: + return "2" + case APlus: + return "3" + } + return "" +} + func (st *Status) Parse(s string) { switch s { case "1": @@ -71,6 +83,32 @@ func (s *SortMode) Parse(str string) { } } +/* INFO Always need to keep in sync with the field that are used in the + * elasticsearch index. + * TODO Verify the field in postgres database + */ +func (s *SortMode) ToESField() string { + switch *s { + case ID: + return "id" + case Name: + return "name" + case Date: + return "date" + case Downloads: + return "downloads" + case Size: + return "filesize" + case Seeders: + return "seeders" + case Leechers: + return "leechers" + case Completed: + return "completed" + } + return "id" +} + type Category struct { Main, Sub uint8 } @@ -87,7 +125,15 @@ func (c Category) String() (s string) { } func (c Category) IsSet() bool { - return c.Main != 0 && c.Sub != 0 + return c.IsMainSet() && c.IsSubSet() +} + +func (c Category) IsMainSet() bool { + return c.Main != 0 +} + +func (c Category) IsSubSet() bool { + return c.Sub != 0 } // Parse sets category by string diff --git a/common/torrent.go b/common/torrent.go index 1e7f28ef..b332f9bc 100644 --- a/common/torrent.go +++ b/common/torrent.go @@ -1,10 +1,24 @@ package common +import ( + "context" + "encoding/json" + "github.com/gorilla/mux" + elastic "gopkg.in/olivere/elastic.v5" + "net/http" + "strconv" + + "github.com/NyaaPantsu/nyaa/config" + "github.com/NyaaPantsu/nyaa/db" + "github.com/NyaaPantsu/nyaa/model" + "github.com/NyaaPantsu/nyaa/util/log" +) + // TorrentParam defines all parameters that can be provided when searching for a torrent type TorrentParam struct { All bool // True means ignore everything but Max and Offset Full bool // True means load all members - Order bool // True means acsending + Order bool // True means ascending Status Status Sort SortMode Category Category @@ -17,6 +31,166 @@ type TorrentParam struct { NameLike string // csv } +// TODO Should probably return an error ? +func (p *TorrentParam) FromRequest(r *http.Request) { + var err error + + nameLike := r.URL.Query().Get("q") + if nameLike == "" { + nameLike = "*" + } + + page := mux.Vars(r)["page"] + pagenum, err := strconv.ParseUint(page, 10, 32) + if err != nil { + pagenum = 1 + } + + max, err := strconv.ParseUint(r.URL.Query().Get("max"), 10, 32) + if err != nil { + max = config.TorrentsPerPage + } else if max > config.MaxTorrentsPerPage { + max = config.MaxTorrentsPerPage + } + + // FIXME 0 means no userId defined + userId, err := strconv.ParseUint(r.URL.Query().Get("userID"), 10, 32) + if err != nil { + userId = 0 + } + + var status Status + status.Parse(r.URL.Query().Get("s")) + + var category Category + category.Parse(r.URL.Query().Get("c")) + + var sortMode SortMode + sortMode.Parse(r.URL.Query().Get("sort")) + + ascending := false + if r.URL.Query().Get("order") == "true" { + ascending = true + } + + p.NameLike = nameLike + p.Offset = uint32(pagenum) + p.Max = uint32(max) + p.UserID = uint32(userId) + // TODO Use All + p.All = false + // TODO Use Full + p.Full = false + p.Order = ascending + p.Status = status + p.Sort = sortMode + p.Category = category + // FIXME 0 means no TorrentId defined + // Do we even need that ? + p.TorrentID = 0 +} + +// Builds a query string with for es query string query defined here +// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html +func (p *TorrentParam) ToFilterQuery() string { + // Don't set sub category unless main category is set + query := "" + if p.Category.IsMainSet() { + query += "category:" + strconv.FormatInt(int64(p.Category.Main), 10) + if p.Category.IsSubSet() { + query += " sub_category:" + strconv.FormatInt(int64(p.Category.Sub), 10) + } + } + + if p.UserID != 0 { + query += "uploader_id:" + strconv.FormatInt(int64(p.UserID), 10) + } + + if p.Status != ShowAll { + query += " status:" + p.Status.ToString() + } + return query +} + +/* Uses elasticsearch to find the torrents based on TorrentParam + * We decided to fetch only the ids from ES and then query these ids to the + * database + */ +func (p *TorrentParam) Find(client *elastic.Client) (int64, []model.Torrent, error) { + // TODO Why is it needed, what does it do ? + ctx := context.Background() + + query := elastic.NewSimpleQueryStringQuery(p.NameLike). + Field("name"). + Analyzer(config.DefaultElasticsearchAnalyzer). + DefaultOperator("AND") + + fsc := elastic.NewFetchSourceContext(true). + Include("id") + + // TODO Find a better way to keep in sync with mapping in ansible + search := client.Search(). + Index(config.DefaultElasticsearchIndex). + Query(query). + Type(config.DefaultElasticsearchType). + From(int((p.Offset-1)*p.Max)). + Size(int(p.Max)). + Sort(p.Sort.ToESField(), p.Order). + Sort("_score", false). // Don't put _score before the field sort, it messes with the sorting + FetchSourceContext(fsc) + + filterQueryString := p.ToFilterQuery() + if filterQueryString != "" { + filterQuery := elastic.NewQueryStringQuery(filterQueryString). + DefaultOperator("AND") + search = search.PostFilter(filterQuery) + } + + result, err := search.Do(ctx) + if err != nil { + return 0, nil, err + } + + log.Infof("Query '%s' took %d milliseconds.", p.NameLike, result.TookInMillis) + log.Infof("Amount of results %d.", result.TotalHits()) + + /* TODO Cleanup this giant mess + * The raw query is used because we need to preserve the order of the id's + * in the IN clause, so we can't just do + * select * from torrents where torrent_id IN (list_of_ids) + * This query is said to work on postgres 9.4+ + */ + { + // Temporary struct to hold the id + // INFO We are not using Hits.Id because the id in the index might not + // correspond to the id in the database later on. + type TId struct { + Id string + } + var tid TId + var torrents []model.Torrent + if len(result.Hits.Hits) > 0 { + torrents = make([]model.Torrent, len(result.Hits.Hits)) + hits := result.Hits.Hits + // Building a string of the form {id1,id2,id3} + source, _ := hits[0].Source.MarshalJSON() + json.Unmarshal(source, &tid) + idsToString := "{" + tid.Id + for _, t := range hits[1:] { + source, _ = t.Source.MarshalJSON() + json.Unmarshal(source, &tid) + idsToString += "," + tid.Id + } + idsToString += "}" + db.ORM.Raw("SELECT * FROM " + config.TorrentsTableName + + " JOIN unnest('" + idsToString + "'::int[]) " + + " WITH ORDINALITY t(torrent_id, ord) USING (torrent_id) ORDER BY t.ord").Find(&torrents) + } + return result.TotalHits(), torrents, nil + } + +} + func (p *TorrentParam) Clone() TorrentParam { return TorrentParam{ Order: p.Order, diff --git a/config/navigation.go b/config/navigation.go index 91374b94..bcffd4ba 100644 --- a/config/navigation.go +++ b/config/navigation.go @@ -5,4 +5,5 @@ package config const ( TorrentsPerPage = 50 + MaxTorrentsPerPage = 300 ) diff --git a/config/search.go b/config/search.go index 19eabbdc..1b4dbaf4 100644 --- a/config/search.go +++ b/config/search.go @@ -4,3 +4,9 @@ type SearchConfig struct { } var DefaultSearchConfig = SearchConfig{} + +const ( + DefaultElasticsearchAnalyzer = "nyaapantsu_analyzer" + DefaultElasticsearchIndex = "nyaapantsu" + DefaultElasticsearchType = "torrents" // Name of the type in the es mapping +) diff --git a/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml b/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml index 5b95c725..32a015aa 100644 --- a/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml +++ b/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml @@ -22,14 +22,33 @@ settings: mappings: torrents: properties: - torrent_id: - type: long - torrent_name: + # TODO Consistent ID's type in TorrentJSON + id: + type: text + fielddata: true # Use to sort by id because it is currently a text field + name: type: text analyzer: nyaapantsu_analyzer + fielddata: true # Use to sort by id because it is currently a text field category: - type: long + type: text sub_category: - type: long + type: text status: type: long + hash: + type: text + date: + type: date + uploader_id: + type: long + downloads: + type: long + seeders: + type: long + leechers: + type: long + completed: + type: long + filesize: + type: long diff --git a/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py b/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py index 64e2216b..9ab4569e 100644 --- a/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py +++ b/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py @@ -23,24 +23,35 @@ es = Elasticsearch() pgconn = psycopg2.connect(dbparams) cur = pgconn.cursor() -cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status +cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status, + torrent_hash, date, uploader, downloads, filesize FROM torrents WHERE deleted_at IS NULL""") fetches = cur.fetchmany(CHUNK_SIZE) while fetches: actions = list() - for torrent_id, torrent_name, category, sub_category, status in fetches: + for torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize in fetches: + # TODO Add seeds/leech + # TODO Consistent ID representation on the codebase doc = { - 'torrent_id': torrent_id, - 'torrent_name': torrent_name.decode('utf-8'), - 'category': category, - 'sub_category': sub_category, - 'status': status + 'id': str(torrent_id), + 'name': torrent_name.decode('utf-8'), + 'category': str(category), + 'sub_category': str(sub_category), + 'status': status, + 'hash': torrent_hash, + 'date': date, + 'uploader_id': uploader, + 'downloads': downloads, + 'filesize': filesize, + 'seeders': 0, # TODO Get seeders from database + 'leechers': 0, # TODO Get leechers from database + 'completed': 0 # TODO Get completed from database } action = { '_index': pantsu_index, - '_type': 'document', + '_type': 'torrents', '_id': torrent_id, '_source': doc } diff --git a/model/torrent.go b/model/torrent.go index 089d3d0b..aedcc9f4 100644 --- a/model/torrent.go +++ b/model/torrent.go @@ -1,16 +1,18 @@ package model import ( - "github.com/NyaaPantsu/nyaa/config" - "github.com/NyaaPantsu/nyaa/util" - "github.com/bradfitz/slice" - "fmt" "html/template" + "context" "path/filepath" "strconv" "strings" "time" + elastic "gopkg.in/olivere/elastic.v5" + + "github.com/NyaaPantsu/nyaa/config" + "github.com/NyaaPantsu/nyaa/util" + "github.com/bradfitz/slice" ) const ( @@ -115,6 +117,29 @@ func (t *Torrent) IsDeleted() bool { return t.DeletedAt != nil } +func (t Torrent) AddToESIndex(client *elastic.Client) error { + ctx := context.Background() + torrentJson := t.ToJSON() + _, err := client.Index(). + Index(config.DefaultElasticsearchIndex). + Type(config.DefaultElasticsearchType). + Id(torrentJson.ID). + BodyJson(torrentJson). + Refresh("true"). + Do(ctx) + return err +} + +func (t Torrent) DeleteFromESIndex(client *elastic.Client) error { + ctx := context.Background() + _, err := client.Delete(). + Index(config.DefaultElasticsearchIndex). + Type(config.DefaultElasticsearchType). + Id(strconv.FormatInt(int64(t.ID), 10)). + Do(ctx) + return err +} + /* We need a JSON object instead of a Gorm structure because magnet URLs are not in the database and have to be generated dynamically */ diff --git a/router/api_handler.go b/router/api_handler.go index a5c972bd..ab042133 100644 --- a/router/api_handler.go +++ b/router/api_handler.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "time" + elastic "gopkg.in/olivere/elastic.v5" "github.com/NyaaPantsu/nyaa/config" "github.com/NyaaPantsu/nyaa/db" @@ -202,6 +203,18 @@ func APIUploadHandler(w http.ResponseWriter, r *http.Request) { Uploader: &user, } db.ORM.Create(&torrent) + + client, err := elastic.NewClient() + if err == nil { + err = torrent.AddToESIndex(client) + if err == nil { + log.Infof("Successfully added torrent to ES index.") + } else { + log.Errorf("Unable to add torrent to ES index: %s", err) + } + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + } /*if err != nil { util.SendError(w, err, 500) return @@ -260,12 +273,7 @@ func APIUpdateHandler(w http.ResponseWriter, r *http.Request) { } update.UpdateTorrent(&torrent) - db.ORM.Model(&torrent).UpdateColumn(&torrent) - if err != nil { - util.SendError(w, err, 500) - return - } - fmt.Printf("%+v\n", torrent) + torrentService.UpdateTorrent(torrent) } } diff --git a/router/search_handler.go b/router/search_handler.go index c57e5b92..06b6ab80 100644 --- a/router/search_handler.go +++ b/router/search_handler.go @@ -4,23 +4,26 @@ import ( "html" "net/http" "strconv" + "github.com/gorilla/mux" "github.com/NyaaPantsu/nyaa/model" "github.com/NyaaPantsu/nyaa/util" "github.com/NyaaPantsu/nyaa/util/log" msg "github.com/NyaaPantsu/nyaa/util/messages" "github.com/NyaaPantsu/nyaa/util/search" - "github.com/gorilla/mux" ) // SearchHandler : Controller for displaying search result page, accepting common search arguments func SearchHandler(w http.ResponseWriter, r *http.Request) { + var err error + // TODO Don't create a new client for each request + messages := msg.GetMessages(r) + // TODO Fallback to postgres search if es is down + vars := mux.Vars(r) page := vars["page"] - messages := msg.GetMessages(r) // db params url - var err error pagenum := 1 if page != "" { pagenum, err = strconv.Atoi(html.EscapeString(page)) @@ -40,17 +43,16 @@ func SearchHandler(w http.ResponseWriter, r *http.Request) { return } - b := model.TorrentsToJSON(torrents) - - common := newCommonVariables(r) - common.Navigation = navigation{nbTorrents, int(searchParam.Max), pagenum, "search_page"} + commonVar := newCommonVariables(r) + commonVar.Navigation = navigation{int(nbTorrents), int(searchParam.Max), int(searchParam.Page), "search_page"} // Convert back to strings for now. - common.Search = searchForm{ + // TODO Deprecate fully SearchParam and only use TorrentParam + commonVar.Search = searchForm{ SearchParam: searchParam, Category: searchParam.Category.String(), ShowItemsPerPage: true, } - htv := modelListVbs{common, b, messages.GetAllErrors(), messages.GetAllInfos()} + htv := modelListVbs{commonVar, model.TorrentsToJSON(torrents), messages.GetAllErrors(), messages.GetAllInfos()} err = searchTemplate.ExecuteTemplate(w, "index.html", htv) if err != nil { diff --git a/router/upload_handler.go b/router/upload_handler.go index 276d108f..3e689d51 100644 --- a/router/upload_handler.go +++ b/router/upload_handler.go @@ -5,6 +5,7 @@ import ( "net/http" "strconv" "time" + elastic "gopkg.in/olivere/elastic.v5" "github.com/NyaaPantsu/nyaa/config" "github.com/NyaaPantsu/nyaa/db" @@ -16,6 +17,7 @@ import ( "github.com/NyaaPantsu/nyaa/service/user" "github.com/NyaaPantsu/nyaa/service/user/permission" "github.com/NyaaPantsu/nyaa/util/languages" + "github.com/NyaaPantsu/nyaa/util/log" msg "github.com/NyaaPantsu/nyaa/util/messages" ) @@ -83,8 +85,23 @@ func UploadPostHandler(w http.ResponseWriter, r *http.Request) { Description: uploadForm.Description, WebsiteLink: uploadForm.WebsiteLink, UploaderID: user.ID} + + db.ORM.Create(&torrent) + client, err := elastic.NewClient() + if err == nil { + err = torrent.AddToESIndex(client) + if err == nil { + log.Infof("Successfully added torrent to ES index.") + } else { + log.Errorf("Unable to add torrent to ES index: %s", err) + } + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + } + + url, err := Router.Get("view_torrent").URL("id", strconv.FormatUint(uint64(torrent.ID), 10)) if user.ID > 0 && config.DefaultUserSettings["new_torrent"] { // If we are a member and notifications for new torrents are enabled diff --git a/service/torrent/torrent.go b/service/torrent/torrent.go index bff14fa0..5f6c6d3e 100644 --- a/service/torrent/torrent.go +++ b/service/torrent/torrent.go @@ -2,6 +2,7 @@ package torrentService import ( "errors" + elastic "gopkg.in/olivere/elastic.v5" "net/http" "strconv" "strings" @@ -11,7 +12,7 @@ import ( "github.com/NyaaPantsu/nyaa/model" "github.com/NyaaPantsu/nyaa/service" "github.com/NyaaPantsu/nyaa/util" - // "github.com/NyaaPantsu/nyaa/util/log" + "github.com/NyaaPantsu/nyaa/util/log" ) /* Function to interact with Models @@ -203,6 +204,19 @@ func DeleteTorrent(id string) (int, error) { if db.ORM.Delete(&torrent).Error != nil { return http.StatusInternalServerError, errors.New("Torrent was not deleted.") } + + // TODO Don't create a new client for each request + client, err := elastic.NewClient() + if err == nil { + err = torrent.DeleteFromESIndex(client) + if err == nil { + log.Infof("Successfully deleted torrent to ES index.") + } else { + log.Errorf("Unable to delete torrent to ES index: %s", err) + } + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + } return http.StatusOK, nil } @@ -214,6 +228,19 @@ func DefinitelyDeleteTorrent(id string) (int, error) { if db.ORM.Unscoped().Model(&torrent).Delete(&torrent).Error != nil { return http.StatusInternalServerError, errors.New("Torrent was not deleted.") } + + // TODO Don't create a new client for each request + client, err := elastic.NewClient() + if err == nil { + err = torrent.DeleteFromESIndex(client) + if err == nil { + log.Infof("Successfully deleted torrent to ES index.") + } else { + log.Errorf("Unable to delete torrent to ES index: %s", err) + } + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + } return http.StatusOK, nil } @@ -238,6 +265,19 @@ func UpdateTorrent(torrent model.Torrent) (int, error) { return http.StatusInternalServerError, errors.New("Torrent was not updated.") } + // TODO Don't create a new client for each request + client, err := elastic.NewClient() + if err == nil { + err = torrent.AddToESIndex(client) + if err == nil { + log.Infof("Successfully updated torrent to ES index.") + } else { + log.Errorf("Unable to update torrent to ES index: %s", err) + } + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + } + return http.StatusOK, nil } diff --git a/util/search/search.go b/util/search/search.go index 100b1bad..a89f1427 100644 --- a/util/search/search.go +++ b/util/search/search.go @@ -6,6 +6,7 @@ import ( "strings" "unicode" "unicode/utf8" + elastic "gopkg.in/olivere/elastic.v5" "github.com/NyaaPantsu/nyaa/cache" "github.com/NyaaPantsu/nyaa/common" @@ -63,8 +64,41 @@ func SearchByQueryDeleted(r *http.Request, pagenum int) (search common.SearchPar return } +// TODO Clean this up +// FIXME Some fields are not used by elasticsearch (pagenum, countAll, deleted, withUser) +// pagenum is extracted from request in .FromRequest() +// elasticsearch always provide a count to how many hits +// deleted is unused because es doesn't index deleted torrents func searchByQuery(r *http.Request, pagenum int, countAll bool, withUser bool, deleted bool) ( search common.SearchParam, tor []model.Torrent, count int, err error, +) { + client, err := elastic.NewClient() + if err == nil { + var torrentParam common.TorrentParam + torrentParam.FromRequest(r) + totalHits, torrents, err := torrentParam.Find(client) + searchParam := common.SearchParam{ + Order: torrentParam.Order, + Status: torrentParam.Status, + Sort: torrentParam.Sort, + Category: torrentParam.Category, + Page: int(torrentParam.Offset), + UserID: uint(torrentParam.UserID), + Max: uint(torrentParam.Max), + NotNull: torrentParam.NotNull, + Query: torrentParam.NameLike, + } + // Convert back to non-json torrents + return searchParam, torrents, int(totalHits), err + } else { + log.Errorf("Unable to create elasticsearch client: %s", err) + log.Errorf("Falling back to postgresql query") + return searchByQueryPostgres(r, pagenum, countAll, withUser, deleted) + } +} + +func searchByQueryPostgres(r *http.Request, pagenum int, countAll bool, withUser bool, deleted bool) ( + search common.SearchParam, tor []model.Torrent, count int, err error, ) { max, err := strconv.ParseUint(r.URL.Query().Get("max"), 10, 32) if err != nil {