From 360b35a08f3b1a67ea3b0b9f40a318d58e125162 Mon Sep 17 00:00:00 2001 From: tomleb Date: Tue, 30 May 2017 22:22:12 -0400 Subject: [PATCH] Add reindexing every 5 minutes, and a bunch of other things (#852) * Fix error messages with ES results * Add lsof for debugging * Add torrents table variable to index sukebei * Use elasticsearch alias for hotswapping index * Increase max open files, increase ES heap size * Add reindex script and reindex triggers We use a table to store the actions happened to the torrents table. When the torrents table is INSERTED/UPDATED/DELETED, the trigger kicks in and an entry is made to the reindex_torrents table. The reindex_nyaapantsu.py script is then used to query the reindex_torrents table and apply the correct reindex action to elasticsearch. The entries are then removed for reindex_torrents table. * Reindex every 5 minutes as cronjob --- deploy/ansible/create_elasticsearch_index.yml | 5 ++ deploy/ansible/group_vars/all | 14 +++- .../ansible/populate_elasticsearch_index.yml | 1 + deploy/ansible/roles/common/tasks/main.yml | 6 ++ .../files/elasticsearch_settings.yml | 1 + .../elasticsearch/files/index_nyaapantsu.py | 30 ++++---- .../elasticsearch/files/reindex_nyaapantsu.py | 71 +++++++++++++++++++ .../roles/elasticsearch/tasks/main.yml | 47 +++++++++++- .../templates/elasticsearch.override.j2 | 3 + .../elasticsearch/templates/reindex_cron.j2 | 5 ++ .../templates/reindex_triggers.sql.j2 | 38 ++++++++++ .../ansible/roles/elasticsearch/vars/main.yml | 3 + 12 files changed, 207 insertions(+), 17 deletions(-) create mode 100644 deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py create mode 100644 deploy/ansible/roles/elasticsearch/templates/elasticsearch.override.j2 create mode 100644 deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2 create mode 100644 deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2 create mode 100644 deploy/ansible/roles/elasticsearch/vars/main.yml diff --git a/deploy/ansible/create_elasticsearch_index.yml b/deploy/ansible/create_elasticsearch_index.yml index 2f3c65d7..bf002bdf 100644 --- a/deploy/ansible/create_elasticsearch_index.yml +++ b/deploy/ansible/create_elasticsearch_index.yml @@ -16,3 +16,8 @@ url: "http://localhost:9200/{{ nyaapantsu_elasticsearch_index }}" method: PUT body: "{{ config.stdout }}" + + - name: Create alias + uri: + url: "http://localhost:9200/{{ nyaapantsu_elasticsearch_index }}/_alias/{{ nyaapantsu_elasticsearch_alias }}" + method: PUT diff --git a/deploy/ansible/group_vars/all b/deploy/ansible/group_vars/all index e50c99bc..a7fcecec 100644 --- a/deploy/ansible/group_vars/all +++ b/deploy/ansible/group_vars/all @@ -4,5 +4,17 @@ nyaapantsu_password: nyaapantsu nyaapantsu_pgpool_port: 9998 nyaapantsu_directory: /nyaapantsu/ nyaapantsu_gpg_passphrase_file: "{{ nyaapantsu_directory }}/passphrase" -nyaapantsu_elasticsearch_index: nyaapantsu + +nyaapantsu_elasticsearch_alias: nyaapantsu +# nyaapantsu_elasticsearch_alias: sukebei + +nyaapantsu_elasticsearch_index: nyaapantsu_v1 +# nyaapantsu_elasticsearch_index: nyaapantsu_sukebei_v1 + +nyaapantsu_torrent_tablename: torrents +# nyaapantsu_torrent_tablename: torrents_sukebei + +nyaapantsu_max_open_files: 200000 +nyaapantsu_jvm_heapsize_gb: 1 + # vim: ft=yaml diff --git a/deploy/ansible/populate_elasticsearch_index.yml b/deploy/ansible/populate_elasticsearch_index.yml index fe3720d2..a54aaf22 100644 --- a/deploy/ansible/populate_elasticsearch_index.yml +++ b/deploy/ansible/populate_elasticsearch_index.yml @@ -11,3 +11,4 @@ environment: PANTSU_DBPARAMS: "host=localhost port={{ nyaapantsu_pgpool_port }} user={{ nyaapantsu_user }} dbname={{ nyaapantsu_dbname }} sslmode=disable password={{ nyaapantsu_password }}" PANTSU_ELASTICSEARCH_INDEX: "{{ nyaapantsu_elasticsearch_index }}" + PANTSU_TORRENT_TABLENAME: "{{ nyaapantsu_torrent_tablename }}" diff --git a/deploy/ansible/roles/common/tasks/main.yml b/deploy/ansible/roles/common/tasks/main.yml index aa878067..39fa156c 100644 --- a/deploy/ansible/roles/common/tasks/main.yml +++ b/deploy/ansible/roles/common/tasks/main.yml @@ -5,3 +5,9 @@ mode: 0755 owner: "{{ ansible_ssh_user }}" become: true + +- name: Install useful stuff + yum: + name: lsof + state: present + become: true diff --git a/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml b/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml index bcee849c..8e5be802 100644 --- a/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml +++ b/deploy/ansible/roles/elasticsearch/files/elasticsearch_settings.yml @@ -18,6 +18,7 @@ settings: index: number_of_shards: 1 number_of_replicas: 0 + max_result_window: 30000 mappings: torrents: diff --git a/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py b/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py index f4252448..307d88bd 100644 --- a/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py +++ b/deploy/ansible/roles/elasticsearch/files/index_nyaapantsu.py @@ -4,20 +4,18 @@ import psycopg2, pprint, sys, time, os CHUNK_SIZE = 10000 -dbparams = '' -pantsu_index = '' +def getEnvOrExit(var): + environment = '' + try: + environment = os.environ[var] + except: + print('[Error]: Environment variable ' + var + ' not defined.') + sys.exit(1) + return environment -try: - dbparams = os.environ['PANTSU_DBPARAMS'] -except: - print('[Error]: Environment variable PANTSU_DBPARAMS not defined.') - sys.exit(1) - -try: - pantsu_index = os.environ['PANTSU_ELASTICSEARCH_INDEX'] -except: - print('[Error]: Environment variable PANTSU_ELASTICSEARCH_INDEX not defined.') - sys.exit(1) +dbparams = getEnvOrExit('PANTSU_DBPARAMS') +pantsu_index = getEnvOrExit('PANTSU_ELASTICSEARCH_INDEX') +torrent_tablename = getEnvOrExit('PANTSU_TORRENT_TABLENAME') es = Elasticsearch() pgconn = psycopg2.connect(dbparams) @@ -25,8 +23,8 @@ pgconn = psycopg2.connect(dbparams) cur = pgconn.cursor() cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed - FROM torrents - WHERE deleted_at IS NULL""") + FROM {torrent_tablename} + WHERE deleted_at IS NULL""".format(torrent_tablename=torrent_tablename)) fetches = cur.fetchmany(CHUNK_SIZE) while fetches: @@ -57,3 +55,5 @@ while fetches: helpers.bulk(es, actions, chunk_size=CHUNK_SIZE, request_timeout=120) del(fetches) fetches = cur.fetchmany(CHUNK_SIZE) +cur.close() +pgconn.close() diff --git a/deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py b/deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py new file mode 100644 index 00000000..4c9abe3c --- /dev/null +++ b/deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py @@ -0,0 +1,71 @@ +# coding: utf-8 +from elasticsearch import Elasticsearch, helpers +import psycopg2, pprint, sys, time, os + +CHUNK_SIZE = 10000 + +def getEnvOrExit(var): + environment = '' + try: + environment = os.environ[var] + except: + print('[Error]: Environment variable ' + var + ' not defined.') + sys.exit(1) + return environment + +dbparams = getEnvOrExit('PANTSU_DBPARAMS') +pantsu_index = getEnvOrExit('PANTSU_ELASTICSEARCH_INDEX') +torrent_tablename = getEnvOrExit('PANTSU_TORRENT_TABLENAME') + +es = Elasticsearch() +pgconn = psycopg2.connect(dbparams) + +cur = pgconn.cursor() +# We MUST use NO QUERY CACHE because the values are insert on triggers and +# not through pgppool. +cur.execute("""/*NO QUERY CACHE*/ SELECT id, torrent_id, action FROM reindex_torrents""") + +fetches = cur.fetchmany(CHUNK_SIZE) +while fetches: + actions = list() + delete_cur = pgconn.cursor() + for reindex_id, torrent_id, action in fetches: + new_action = { + '_op_type': action, + '_index': pantsu_index, + '_type': 'torrents', + '_id': torrent_id + } + if action == 'index': + select_cur = pgconn.cursor() + select_cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status, + torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed + FROM {torrent_tablename} + WHERE torrent_id = {torrent_id}""".format(torrent_id=torrent_id, torrent_tablename=torrent_tablename)) + torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed = select_cur.fetchone() + doc = { + 'id': torrent_id, + 'name': torrent_name.decode('utf-8'), + 'category': str(category), + 'sub_category': str(sub_category), + 'status': status, + 'hash': torrent_hash, + 'date': date, + 'uploader_id': uploader, + 'downloads': downloads, + 'filesize': filesize, + 'seeders': seeders, + 'leechers': leechers, + 'completed': completed + } + new_action['_source'] = doc + select_cur.close() + delete_cur.execute('DELETE FROM reindex_torrents WHERE id = {reindex_id}'.format(reindex_id=reindex_id)) + actions.append(new_action) + pgconn.commit() # Commit the deletes transaction + delete_cur.close() + helpers.bulk(es, actions, chunk_size=CHUNK_SIZE, request_timeout=120) + del(fetches) + fetches = cur.fetchmany(CHUNK_SIZE) +cur.close() +pgconn.close() diff --git a/deploy/ansible/roles/elasticsearch/tasks/main.yml b/deploy/ansible/roles/elasticsearch/tasks/main.yml index 7facfc98..f2ed1996 100644 --- a/deploy/ansible/roles/elasticsearch/tasks/main.yml +++ b/deploy/ansible/roles/elasticsearch/tasks/main.yml @@ -32,11 +32,56 @@ - elasticsearch_settings.yml - index_nyaapantsu.py +- name: Increase system max open files + lineinfile: + path: /etc/sysctl.conf + regexp: '^fs.file-max.*' + line: 'fs.file-max = {{ nyaapantsu_max_open_files }}' + become: true + +- name: Increase JVM heapsize + lineinfile: + path: /etc/sysconfig/elasticsearch + regexp: '^ES_JAVA_OPTS=.*' + line: 'ES_JAVA_OPTS="-Xms{{ nyaapantsu_jvm_heapsize_gb }}g -Xmx{{ nyaapantsu_jvm_heapsize_gb }}g"' + become: true + +- name: Create override folder + file: + path: /etc/systemd/system/elasticsearch.service.d/ + state: directory + mode: 0755 + become: true + +- name: Add override service file + template: + src: elasticsearch.override.j2 + dest: /etc/systemd/system/elasticsearch.service.d/override.conf + become: true + - name: Enable and start elasticsearch systemd: enabled: yes name: elasticsearch state: started + daemon_reload: yes become: true -# TODO Create / update index +- name: Copy reindexing triggers + template: + src: reindex_triggers.sql.j2 + dest: "{{ nyaapantsu_directory }}/reindex_triggers.sql" + +- name: Apply reindexing triggers + shell: psql -U "{{ nyaapantsu_user }}" "{{ nyaapantsu_dbname }}" < "{{ nyaapantsu_directory }}/reindex_triggers.sql" + +- name: Copy reindexing script + copy: + src: reindex_nyaapantsu.py + dest: "{{ elasticsearch_reindex_script }}" + +- name: Setup reindexing cron job + template: + src: reindex_cron.j2 + dest: "/etc/cron.d/reindex_{{ nyaapantsu_torrent_tablename }}" + become: true diff --git a/deploy/ansible/roles/elasticsearch/templates/elasticsearch.override.j2 b/deploy/ansible/roles/elasticsearch/templates/elasticsearch.override.j2 new file mode 100644 index 00000000..a143faaa --- /dev/null +++ b/deploy/ansible/roles/elasticsearch/templates/elasticsearch.override.j2 @@ -0,0 +1,3 @@ +[Service] +LimitNOFILE= +LimitNOFILE={{ nyaapantsu_max_open_files }} diff --git a/deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2 b/deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2 new file mode 100644 index 00000000..19d30e5f --- /dev/null +++ b/deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2 @@ -0,0 +1,5 @@ +PANTSU_DBPARAMS="host=localhost port={{ nyaapantsu_pgpool_port }} user={{ nyaapantsu_user }} password={{ nyaapantsu_password }} sslmode=disable" +PANTSU_ELASTICSEARCH_INDEX="{{ nyaapantsu_elasticsearch_alias }}" +PANTSU_TORRENT_TABLENAME="{{ nyaapantsu_torrent_tablename }}" +{{ elasticsearch_cron_minutes }} * * * * {{ ansible_ssh_user }} python {{ elasticsearch_reindex_script }} + diff --git a/deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2 b/deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2 new file mode 100644 index 00000000..8a8f4e41 --- /dev/null +++ b/deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2 @@ -0,0 +1,38 @@ +-- Matches the _op_type values from elasticsearch bulk API +-- https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + +CREATE TYPE torrents_action AS ENUM ('index', 'delete'); + +CREATE TABLE IF NOT EXISTS reindex_torrents ( + reindex_torrents_id SERIAL, + torrent_id int, + action torrents_action +); + +CREATE OR REPLACE FUNCTION add_reindex_torrents_action() RETURNS TRIGGER AS $$ + BEGIN + IF (TG_OP = 'INSERT') THEN + INSERT INTO reindex_torrents (torrent_id, action) VALUES (NEW.torrent_id, 'index'); + RETURN NEW; + ELSIF (TG_OP = 'UPDATE') THEN + IF (NEW.deleted_at IS NOT NULL) THEN + INSERT INTO reindex_torrents (torrent_id, action) VALUES (OLD.torrent_id, 'delete'); + RETURN NEW; + ELSE + INSERT INTO reindex_torrents (torrent_id, action) VALUES (NEW.torrent_id, 'index'); + RETURN NEW; + END IF; + ELSIF (TG_OP = 'DELETE') THEN + INSERT INTO reindex_torrents (torrent_id, action) VALUES (OLD.torrent_id, 'delete'); + RETURN OLD; + END IF; + RETURN NULL; -- result is ignored since this is an AFTER trigger + END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trigger_reindex_torrents ON {{ nyaapantsu_torrent_tablename }}; +CREATE TRIGGER trigger_reindex_torrents +AFTER INSERT OR UPDATE OR DELETE ON {{ nyaapantsu_torrent_tablename }} + FOR EACH ROW EXECUTE PROCEDURE add_reindex_torrents_action(); + +-- vim: ft=sql diff --git a/deploy/ansible/roles/elasticsearch/vars/main.yml b/deploy/ansible/roles/elasticsearch/vars/main.yml new file mode 100644 index 00000000..c764c4eb --- /dev/null +++ b/deploy/ansible/roles/elasticsearch/vars/main.yml @@ -0,0 +1,3 @@ +# Run job every 5 minutes +elasticsearch_cron_minutes: "*/5" +elasticsearch_reindex_script: "{{ nyaapantsu_directory }}/reindex_nyaapantsu.py"