-
Notifications
You must be signed in to change notification settings - Fork 1
/
lin-node.py
183 lines (153 loc) · 5.79 KB
/
lin-node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205
import functools
import logging
import threading
import time,os,json,pathlib,base64,sys,traceback
from random import randint
from time import sleep
import pika,pandas as pd, sqlalchemy as sal
from pika.exchange_type import ExchangeType
from loguru import logger as lg
cleanupfiles = os.getenv('cleanup','True')
app_env = os.getenv('app_env','Dev')
#rabbitmq_host = os.getenv('rabbitmq_host','localhost')
rabbitmq_host = os.getenv('rabbitmq_host','cluster.rke.natimark.com')
#rabbitmq_port = int(os.getenv('rabbitmq_port','5672'))
rabbitmq_port = int(os.getenv('rabbitmq_port','32304'))
rabbitmq_user = os.getenv('rabbitmq_user','guest')
rabbitmq_pass = os.getenv('rabbitmq_pass','guest')
worker_threads = 4 ##this is the var i want to set for how many worker threads run
# if os.name == 'posix':
# rabbitmq_watchqueue = os.getenv('rabbitmq_watchqueue','finish')
# rabbitmq_resultqueue = os.getenv('rabbitmq_resultqueue','')
# else:
# rabbitmq_watchqueue = os.getenv('rabbitmq_watchqueue','start')
# rabbitmq_resultqueue = os.getenv('rabbitmq_resultqueue','finish')
rabbitmq_watchqueue = os.getenv('rabbitmq_watchqueue','finish')
rabbitmq_resultqueue = os.getenv('rabbitmq_resultqueue','')
def exception_handler(exctype, value, tb):
lg.add("file_{time}.log")
lg.error(exctype)
lg.error(value)
lg.error(traceback.extract_tb(tb))
sys.excepthook = exception_handler
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
"""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
# Channel is already closed, so we can't ACK this message;
# log and/or do something that makes sense for your app in this case.
pass
def do_work(ch, delivery_tag, body):
thread_id = threading.get_ident()
jsondata = json.loads(body)
LOGGER.info('Thread id: %s Delivery tag: %s jobbatchid: %s', thread_id,
delivery_tag, jsondata['request']['params']['jobbatchid'])
entry_func = globals()[rabbitmq_watchqueue]
try:
entry_func(jsondata)
except:
import sys
exc_info = sys.exc_info()
print("""############
############
############
Error running job: """ + exc_info + """
############
############
############""")
#if we error. stop everything (bounce container)
channel.stop_consuming()
pass
finally:
cb = functools.partial(ack_message, ch, delivery_tag)
ch.connection.add_callback_threadsafe(cb)
def on_message(ch, method_frame, _header_frame, body, args):
thrds = args
delivery_tag = method_frame.delivery_tag
t = threading.Thread(target=do_work, args=(ch, delivery_tag, body))
t.start()
thrds.append(t)
def start(jsondata):
print("""############
############
############
starting job
############
############
############""")
jsondata = runjob(jsondata)
putcompleted(jsondata)
return jsondata
def runjob(jsondata):
# this runs on windows node
# random sleep
sleepint=randint(10,100)
sleep(sleepint)
jsondata['response'] = {}
jsondata['response']['result'] = 'windows slept for '+str(sleepint)
lg.debug(json.dumps(jsondata))
return jsondata
def putcompleted(jsondata):
r = post_to_queue(jsondata,rabbitmq_resultqueue)
return r
def finish(jsondata):
# this runs on linux node
# random sleep
sleepint=randint(10,100)
sleep(sleepint)
jsondata['response'] = {}
jsondata['response']['result'] = 'Linux slept for '+str(sleepint)
lg.debug(json.dumps(jsondata))
return #all done
def rabbit_init():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port)
)
channel = connection.channel()
return connection,channel
def post_to_queue(jsondata,queuename):
connection,channel = rabbit_init()
channel.queue_declare(queue=queuename,auto_delete=False)
channel.basic_publish(exchange='', routing_key=queuename, body=json.dumps(jsondata))
connection.close()
return({'status':'sent'})
credentials = pika.PlainCredentials(username=rabbitmq_user,password=rabbitmq_pass)
# Note: sending a short heartbeat to prove that heartbeats are still
# sent even though the worker simulates long-running work
parameters = pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port, credentials=credentials, heartbeat=30)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(
exchange=rabbitmq_watchqueue,
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
channel.queue_declare(queue=rabbitmq_watchqueue, auto_delete=False)
channel.queue_bind(
queue=rabbitmq_watchqueue, exchange=rabbitmq_watchqueue, routing_key=rabbitmq_watchqueue)
# Note: prefetch is set to 1 here as an example only and to keep the number of threads created
# to a reasonable amount. In production you will want to test with different prefetch values
# to find which one provides the best performance and usability for your solution
channel.basic_qos(prefetch_count=1)
threads = []
on_message_callback = functools.partial(on_message, args=(threads))
channel.basic_consume(on_message_callback=on_message_callback, queue=rabbitmq_watchqueue)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# Wait for all to complete
for thread in threads:
thread.join()
connection.close()