Add reindexing every 5 minutes, and a bunch of other things (#852)
* Fix error messages with ES results * Add lsof for debugging * Add torrents table variable to index sukebei * Use elasticsearch alias for hotswapping index * Increase max open files, increase ES heap size * Add reindex script and reindex triggers We use a table to store the actions happened to the torrents table. When the torrents table is INSERTED/UPDATED/DELETED, the trigger kicks in and an entry is made to the reindex_torrents table. The reindex_nyaapantsu.py script is then used to query the reindex_torrents table and apply the correct reindex action to elasticsearch. The entries are then removed for reindex_torrents table. * Reindex every 5 minutes as cronjob
Cette révision appartient à :
Parent
5376b9e271
révision
360b35a08f
12 fichiers modifiés avec 207 ajouts et 17 suppressions
|
@ -16,3 +16,8 @@
|
|||
url: "http://localhost:9200/{{ nyaapantsu_elasticsearch_index }}"
|
||||
method: PUT
|
||||
body: "{{ config.stdout }}"
|
||||
|
||||
- name: Create alias
|
||||
uri:
|
||||
url: "http://localhost:9200/{{ nyaapantsu_elasticsearch_index }}/_alias/{{ nyaapantsu_elasticsearch_alias }}"
|
||||
method: PUT
|
||||
|
|
|
@ -4,5 +4,17 @@ nyaapantsu_password: nyaapantsu
|
|||
nyaapantsu_pgpool_port: 9998
|
||||
nyaapantsu_directory: /nyaapantsu/
|
||||
nyaapantsu_gpg_passphrase_file: "{{ nyaapantsu_directory }}/passphrase"
|
||||
nyaapantsu_elasticsearch_index: nyaapantsu
|
||||
|
||||
nyaapantsu_elasticsearch_alias: nyaapantsu
|
||||
# nyaapantsu_elasticsearch_alias: sukebei
|
||||
|
||||
nyaapantsu_elasticsearch_index: nyaapantsu_v1
|
||||
# nyaapantsu_elasticsearch_index: nyaapantsu_sukebei_v1
|
||||
|
||||
nyaapantsu_torrent_tablename: torrents
|
||||
# nyaapantsu_torrent_tablename: torrents_sukebei
|
||||
|
||||
nyaapantsu_max_open_files: 200000
|
||||
nyaapantsu_jvm_heapsize_gb: 1
|
||||
|
||||
# vim: ft=yaml
|
||||
|
|
|
@ -11,3 +11,4 @@
|
|||
environment:
|
||||
PANTSU_DBPARAMS: "host=localhost port={{ nyaapantsu_pgpool_port }} user={{ nyaapantsu_user }} dbname={{ nyaapantsu_dbname }} sslmode=disable password={{ nyaapantsu_password }}"
|
||||
PANTSU_ELASTICSEARCH_INDEX: "{{ nyaapantsu_elasticsearch_index }}"
|
||||
PANTSU_TORRENT_TABLENAME: "{{ nyaapantsu_torrent_tablename }}"
|
||||
|
|
|
@ -5,3 +5,9 @@
|
|||
mode: 0755
|
||||
owner: "{{ ansible_ssh_user }}"
|
||||
become: true
|
||||
|
||||
- name: Install useful stuff
|
||||
yum:
|
||||
name: lsof
|
||||
state: present
|
||||
become: true
|
||||
|
|
|
@ -18,6 +18,7 @@ settings:
|
|||
index:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
max_result_window: 30000
|
||||
|
||||
mappings:
|
||||
torrents:
|
||||
|
|
|
@ -4,20 +4,18 @@ import psycopg2, pprint, sys, time, os
|
|||
|
||||
CHUNK_SIZE = 10000
|
||||
|
||||
dbparams = ''
|
||||
pantsu_index = ''
|
||||
def getEnvOrExit(var):
|
||||
environment = ''
|
||||
try:
|
||||
environment = os.environ[var]
|
||||
except:
|
||||
print('[Error]: Environment variable ' + var + ' not defined.')
|
||||
sys.exit(1)
|
||||
return environment
|
||||
|
||||
try:
|
||||
dbparams = os.environ['PANTSU_DBPARAMS']
|
||||
except:
|
||||
print('[Error]: Environment variable PANTSU_DBPARAMS not defined.')
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
pantsu_index = os.environ['PANTSU_ELASTICSEARCH_INDEX']
|
||||
except:
|
||||
print('[Error]: Environment variable PANTSU_ELASTICSEARCH_INDEX not defined.')
|
||||
sys.exit(1)
|
||||
dbparams = getEnvOrExit('PANTSU_DBPARAMS')
|
||||
pantsu_index = getEnvOrExit('PANTSU_ELASTICSEARCH_INDEX')
|
||||
torrent_tablename = getEnvOrExit('PANTSU_TORRENT_TABLENAME')
|
||||
|
||||
es = Elasticsearch()
|
||||
pgconn = psycopg2.connect(dbparams)
|
||||
|
@ -25,8 +23,8 @@ pgconn = psycopg2.connect(dbparams)
|
|||
cur = pgconn.cursor()
|
||||
cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status,
|
||||
torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed
|
||||
FROM torrents
|
||||
WHERE deleted_at IS NULL""")
|
||||
FROM {torrent_tablename}
|
||||
WHERE deleted_at IS NULL""".format(torrent_tablename=torrent_tablename))
|
||||
|
||||
fetches = cur.fetchmany(CHUNK_SIZE)
|
||||
while fetches:
|
||||
|
@ -57,3 +55,5 @@ while fetches:
|
|||
helpers.bulk(es, actions, chunk_size=CHUNK_SIZE, request_timeout=120)
|
||||
del(fetches)
|
||||
fetches = cur.fetchmany(CHUNK_SIZE)
|
||||
cur.close()
|
||||
pgconn.close()
|
||||
|
|
71
deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py
Fichier normal
71
deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py
Fichier normal
|
@ -0,0 +1,71 @@
|
|||
# coding: utf-8
|
||||
from elasticsearch import Elasticsearch, helpers
|
||||
import psycopg2, pprint, sys, time, os
|
||||
|
||||
CHUNK_SIZE = 10000
|
||||
|
||||
def getEnvOrExit(var):
|
||||
environment = ''
|
||||
try:
|
||||
environment = os.environ[var]
|
||||
except:
|
||||
print('[Error]: Environment variable ' + var + ' not defined.')
|
||||
sys.exit(1)
|
||||
return environment
|
||||
|
||||
dbparams = getEnvOrExit('PANTSU_DBPARAMS')
|
||||
pantsu_index = getEnvOrExit('PANTSU_ELASTICSEARCH_INDEX')
|
||||
torrent_tablename = getEnvOrExit('PANTSU_TORRENT_TABLENAME')
|
||||
|
||||
es = Elasticsearch()
|
||||
pgconn = psycopg2.connect(dbparams)
|
||||
|
||||
cur = pgconn.cursor()
|
||||
# We MUST use NO QUERY CACHE because the values are insert on triggers and
|
||||
# not through pgppool.
|
||||
cur.execute("""/*NO QUERY CACHE*/ SELECT id, torrent_id, action FROM reindex_torrents""")
|
||||
|
||||
fetches = cur.fetchmany(CHUNK_SIZE)
|
||||
while fetches:
|
||||
actions = list()
|
||||
delete_cur = pgconn.cursor()
|
||||
for reindex_id, torrent_id, action in fetches:
|
||||
new_action = {
|
||||
'_op_type': action,
|
||||
'_index': pantsu_index,
|
||||
'_type': 'torrents',
|
||||
'_id': torrent_id
|
||||
}
|
||||
if action == 'index':
|
||||
select_cur = pgconn.cursor()
|
||||
select_cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status,
|
||||
torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed
|
||||
FROM {torrent_tablename}
|
||||
WHERE torrent_id = {torrent_id}""".format(torrent_id=torrent_id, torrent_tablename=torrent_tablename))
|
||||
torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed = select_cur.fetchone()
|
||||
doc = {
|
||||
'id': torrent_id,
|
||||
'name': torrent_name.decode('utf-8'),
|
||||
'category': str(category),
|
||||
'sub_category': str(sub_category),
|
||||
'status': status,
|
||||
'hash': torrent_hash,
|
||||
'date': date,
|
||||
'uploader_id': uploader,
|
||||
'downloads': downloads,
|
||||
'filesize': filesize,
|
||||
'seeders': seeders,
|
||||
'leechers': leechers,
|
||||
'completed': completed
|
||||
}
|
||||
new_action['_source'] = doc
|
||||
select_cur.close()
|
||||
delete_cur.execute('DELETE FROM reindex_torrents WHERE id = {reindex_id}'.format(reindex_id=reindex_id))
|
||||
actions.append(new_action)
|
||||
pgconn.commit() # Commit the deletes transaction
|
||||
delete_cur.close()
|
||||
helpers.bulk(es, actions, chunk_size=CHUNK_SIZE, request_timeout=120)
|
||||
del(fetches)
|
||||
fetches = cur.fetchmany(CHUNK_SIZE)
|
||||
cur.close()
|
||||
pgconn.close()
|
|
@ -32,11 +32,56 @@
|
|||
- elasticsearch_settings.yml
|
||||
- index_nyaapantsu.py
|
||||
|
||||
- name: Increase system max open files
|
||||
lineinfile:
|
||||
path: /etc/sysctl.conf
|
||||
regexp: '^fs.file-max.*'
|
||||
line: 'fs.file-max = {{ nyaapantsu_max_open_files }}'
|
||||
become: true
|
||||
|
||||
- name: Increase JVM heapsize
|
||||
lineinfile:
|
||||
path: /etc/sysconfig/elasticsearch
|
||||
regexp: '^ES_JAVA_OPTS=.*'
|
||||
line: 'ES_JAVA_OPTS="-Xms{{ nyaapantsu_jvm_heapsize_gb }}g -Xmx{{ nyaapantsu_jvm_heapsize_gb }}g"'
|
||||
become: true
|
||||
|
||||
- name: Create override folder
|
||||
file:
|
||||
path: /etc/systemd/system/elasticsearch.service.d/
|
||||
state: directory
|
||||
mode: 0755
|
||||
become: true
|
||||
|
||||
- name: Add override service file
|
||||
template:
|
||||
src: elasticsearch.override.j2
|
||||
dest: /etc/systemd/system/elasticsearch.service.d/override.conf
|
||||
become: true
|
||||
|
||||
- name: Enable and start elasticsearch
|
||||
systemd:
|
||||
enabled: yes
|
||||
name: elasticsearch
|
||||
state: started
|
||||
daemon_reload: yes
|
||||
become: true
|
||||
|
||||
# TODO Create / update index
|
||||
- name: Copy reindexing triggers
|
||||
template:
|
||||
src: reindex_triggers.sql.j2
|
||||
dest: "{{ nyaapantsu_directory }}/reindex_triggers.sql"
|
||||
|
||||
- name: Apply reindexing triggers
|
||||
shell: psql -U "{{ nyaapantsu_user }}" "{{ nyaapantsu_dbname }}" < "{{ nyaapantsu_directory }}/reindex_triggers.sql"
|
||||
|
||||
- name: Copy reindexing script
|
||||
copy:
|
||||
src: reindex_nyaapantsu.py
|
||||
dest: "{{ elasticsearch_reindex_script }}"
|
||||
|
||||
- name: Setup reindexing cron job
|
||||
template:
|
||||
src: reindex_cron.j2
|
||||
dest: "/etc/cron.d/reindex_{{ nyaapantsu_torrent_tablename }}"
|
||||
become: true
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
[Service]
|
||||
LimitNOFILE=
|
||||
LimitNOFILE={{ nyaapantsu_max_open_files }}
|
5
deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2
Fichier normal
5
deploy/ansible/roles/elasticsearch/templates/reindex_cron.j2
Fichier normal
|
@ -0,0 +1,5 @@
|
|||
PANTSU_DBPARAMS="host=localhost port={{ nyaapantsu_pgpool_port }} user={{ nyaapantsu_user }} password={{ nyaapantsu_password }} sslmode=disable"
|
||||
PANTSU_ELASTICSEARCH_INDEX="{{ nyaapantsu_elasticsearch_alias }}"
|
||||
PANTSU_TORRENT_TABLENAME="{{ nyaapantsu_torrent_tablename }}"
|
||||
{{ elasticsearch_cron_minutes }} * * * * {{ ansible_ssh_user }} python {{ elasticsearch_reindex_script }}
|
||||
|
38
deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2
Fichier normal
38
deploy/ansible/roles/elasticsearch/templates/reindex_triggers.sql.j2
Fichier normal
|
@ -0,0 +1,38 @@
|
|||
-- Matches the _op_type values from elasticsearch bulk API
|
||||
-- https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
|
||||
|
||||
CREATE TYPE torrents_action AS ENUM ('index', 'delete');
|
||||
|
||||
CREATE TABLE IF NOT EXISTS reindex_torrents (
|
||||
reindex_torrents_id SERIAL,
|
||||
torrent_id int,
|
||||
action torrents_action
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION add_reindex_torrents_action() RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF (TG_OP = 'INSERT') THEN
|
||||
INSERT INTO reindex_torrents (torrent_id, action) VALUES (NEW.torrent_id, 'index');
|
||||
RETURN NEW;
|
||||
ELSIF (TG_OP = 'UPDATE') THEN
|
||||
IF (NEW.deleted_at IS NOT NULL) THEN
|
||||
INSERT INTO reindex_torrents (torrent_id, action) VALUES (OLD.torrent_id, 'delete');
|
||||
RETURN NEW;
|
||||
ELSE
|
||||
INSERT INTO reindex_torrents (torrent_id, action) VALUES (NEW.torrent_id, 'index');
|
||||
RETURN NEW;
|
||||
END IF;
|
||||
ELSIF (TG_OP = 'DELETE') THEN
|
||||
INSERT INTO reindex_torrents (torrent_id, action) VALUES (OLD.torrent_id, 'delete');
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
RETURN NULL; -- result is ignored since this is an AFTER trigger
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
DROP TRIGGER IF EXISTS trigger_reindex_torrents ON {{ nyaapantsu_torrent_tablename }};
|
||||
CREATE TRIGGER trigger_reindex_torrents
|
||||
AFTER INSERT OR UPDATE OR DELETE ON {{ nyaapantsu_torrent_tablename }}
|
||||
FOR EACH ROW EXECUTE PROCEDURE add_reindex_torrents_action();
|
||||
|
||||
-- vim: ft=sql
|
3
deploy/ansible/roles/elasticsearch/vars/main.yml
Fichier normal
3
deploy/ansible/roles/elasticsearch/vars/main.yml
Fichier normal
|
@ -0,0 +1,3 @@
|
|||
# Run job every 5 minutes
|
||||
elasticsearch_cron_minutes: "*/5"
|
||||
elasticsearch_reindex_script: "{{ nyaapantsu_directory }}/reindex_nyaapantsu.py"
|
Référencer dans un nouveau ticket