2017-05-31 04:22:12 +02:00
# coding: utf-8
from elasticsearch import Elasticsearch , helpers
import psycopg2 , pprint , sys , time , os
CHUNK_SIZE = 10000
def getEnvOrExit ( var ) :
environment = ' '
try :
environment = os . environ [ var ]
except :
print ( ' [Error]: Environment variable ' + var + ' not defined. ' )
sys . exit ( 1 )
return environment
dbparams = getEnvOrExit ( ' PANTSU_DBPARAMS ' )
pantsu_index = getEnvOrExit ( ' PANTSU_ELASTICSEARCH_INDEX ' )
torrent_tablename = getEnvOrExit ( ' PANTSU_TORRENT_TABLENAME ' )
es = Elasticsearch ( )
pgconn = psycopg2 . connect ( dbparams )
cur = pgconn . cursor ( )
# We MUST use NO QUERY CACHE because the values are insert on triggers and
# not through pgppool.
2017-06-01 03:19:55 +02:00
cur . execute ( """ /*NO QUERY CACHE*/ SELECT reindex_torrents_id, torrent_id, action FROM reindex_torrents """ )
2017-05-31 04:22:12 +02:00
fetches = cur . fetchmany ( CHUNK_SIZE )
while fetches :
actions = list ( )
delete_cur = pgconn . cursor ( )
for reindex_id , torrent_id , action in fetches :
new_action = {
' _op_type ' : action ,
' _index ' : pantsu_index ,
' _type ' : ' torrents ' ,
' _id ' : torrent_id
}
if action == ' index ' :
select_cur = pgconn . cursor ( )
select_cur . execute ( """ SELECT torrent_id, torrent_name, category, sub_category, status,
torrent_hash , date , uploader , downloads , filesize , seeders , leechers , completed
FROM { torrent_tablename }
WHERE torrent_id = { torrent_id } """ .format(torrent_id=torrent_id, torrent_tablename=torrent_tablename))
torrent_id , torrent_name , category , sub_category , status , torrent_hash , date , uploader , downloads , filesize , seeders , leechers , completed = select_cur . fetchone ( )
doc = {
' id ' : torrent_id ,
' name ' : torrent_name . decode ( ' utf-8 ' ) ,
' category ' : str ( category ) ,
' sub_category ' : str ( sub_category ) ,
' status ' : status ,
' hash ' : torrent_hash ,
' date ' : date ,
' uploader_id ' : uploader ,
' downloads ' : downloads ,
' filesize ' : filesize ,
' seeders ' : seeders ,
' leechers ' : leechers ,
' completed ' : completed
}
new_action [ ' _source ' ] = doc
select_cur . close ( )
delete_cur . execute ( ' DELETE FROM reindex_torrents WHERE id = {reindex_id} ' . format ( reindex_id = reindex_id ) )
actions . append ( new_action )
pgconn . commit ( ) # Commit the deletes transaction
delete_cur . close ( )
helpers . bulk ( es , actions , chunk_size = CHUNK_SIZE , request_timeout = 120 )
del ( fetches )
fetches = cur . fetchmany ( CHUNK_SIZE )
cur . close ( )
pgconn . close ( )