Albirew/nyaa-pantsu
Archivé
1
0
Bifurcation 0
Ce dépôt a été archivé le 2022-05-07. Vous pouvez voir ses fichiers ou le cloner, mais pas ouvrir de ticket ou de demandes d'ajout, ni soumettre de changements.
nyaa-pantsu/deploy/ansible/roles/elasticsearch/files/reindex_nyaapantsu.py
tomleb 360b35a08f Add reindexing every 5 minutes, and a bunch of other things (#852)
* Fix error messages with ES results

* Add lsof for debugging

* Add torrents table variable to index sukebei

* Use elasticsearch alias for hotswapping index

* Increase max open files, increase ES heap size

* Add reindex script and reindex triggers

We use a table to store the actions happened to the torrents table.
When the torrents table is INSERTED/UPDATED/DELETED, the trigger kicks
in and an entry is made to the reindex_torrents table.

The reindex_nyaapantsu.py script is then used to query the
reindex_torrents table and apply the correct reindex action to
elasticsearch. The entries are then removed for reindex_torrents table.

* Reindex every 5 minutes as cronjob
2017-05-30 21:22:12 -05:00

71 lignes
2,6 Kio
Python

# coding: utf-8
from elasticsearch import Elasticsearch, helpers
import psycopg2, pprint, sys, time, os
CHUNK_SIZE = 10000
def getEnvOrExit(var):
environment = ''
try:
environment = os.environ[var]
except:
print('[Error]: Environment variable ' + var + ' not defined.')
sys.exit(1)
return environment
dbparams = getEnvOrExit('PANTSU_DBPARAMS')
pantsu_index = getEnvOrExit('PANTSU_ELASTICSEARCH_INDEX')
torrent_tablename = getEnvOrExit('PANTSU_TORRENT_TABLENAME')
es = Elasticsearch()
pgconn = psycopg2.connect(dbparams)
cur = pgconn.cursor()
# We MUST use NO QUERY CACHE because the values are insert on triggers and
# not through pgppool.
cur.execute("""/*NO QUERY CACHE*/ SELECT id, torrent_id, action FROM reindex_torrents""")
fetches = cur.fetchmany(CHUNK_SIZE)
while fetches:
actions = list()
delete_cur = pgconn.cursor()
for reindex_id, torrent_id, action in fetches:
new_action = {
'_op_type': action,
'_index': pantsu_index,
'_type': 'torrents',
'_id': torrent_id
}
if action == 'index':
select_cur = pgconn.cursor()
select_cur.execute("""SELECT torrent_id, torrent_name, category, sub_category, status,
torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed
FROM {torrent_tablename}
WHERE torrent_id = {torrent_id}""".format(torrent_id=torrent_id, torrent_tablename=torrent_tablename))
torrent_id, torrent_name, category, sub_category, status, torrent_hash, date, uploader, downloads, filesize, seeders, leechers, completed = select_cur.fetchone()
doc = {
'id': torrent_id,
'name': torrent_name.decode('utf-8'),
'category': str(category),
'sub_category': str(sub_category),
'status': status,
'hash': torrent_hash,
'date': date,
'uploader_id': uploader,
'downloads': downloads,
'filesize': filesize,
'seeders': seeders,
'leechers': leechers,
'completed': completed
}
new_action['_source'] = doc
select_cur.close()
delete_cur.execute('DELETE FROM reindex_torrents WHERE id = {reindex_id}'.format(reindex_id=reindex_id))
actions.append(new_action)
pgconn.commit() # Commit the deletes transaction
delete_cur.close()
helpers.bulk(es, actions, chunk_size=CHUNK_SIZE, request_timeout=120)
del(fetches)
fetches = cur.fetchmany(CHUNK_SIZE)
cur.close()
pgconn.close()