-
Notifications
You must be signed in to change notification settings - Fork 1
/
task_utils.py
126 lines (102 loc) · 4.2 KB
/
task_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from prefect import get_run_logger
import json
import uuid
import pika
def get_task_names(task_obj_list):
task_names = []
for obj in task_obj_list:
task_names.append(obj.task_name)
return task_names
def get_task_contexts(task_obj_list):
contexts = []
for obj in task_obj_list:
context = json.loads(obj.context.replace('\'', '"'))
contexts.append(context)
return contexts
def set_task_model_values(task, pipeline):
task.output_id = '1'
# create_resource(
# {'package_id': pipeline.model.output_id, 'resource_name': task.task_name, 'data': pipeline.data})
print({'package_id': task.output_id, 'resource_name': task.task_name, 'data': pipeline.data})
task.status = "Done"
task.save()
def remove_unnamed_col(data_frame):
return data_frame.loc[:, ~data_frame.columns.str.contains('^Unnamed')]
def populate_task_schema(key_entry, format_entry, description_entry):
schema_dict = {"key": key_entry, "format": format_entry, "description": description_entry}
return schema_dict
def send_error_to_prefect_cloud(e: Exception):
prefect_logger = get_run_logger()
prefect_logger.error(str(e))
class TasksRpcClient(object):
def __init__(self, task_name, context, data):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.routing_key = task_name
self.context = context
self.data = data
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = self.channel.queue_declare(queue='', exclusive=False, durable=True)
self.callback_queue = result.method.queue
print("queue name-----", self.callback_queue)
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
print("correlation id while receiving...", props.correlation_id)
if self.corr_id == props.correlation_id:
self.response = body
def call(self):
self.response = None
self.corr_id = str(uuid.uuid4())
# first send an ack message to see if the worker is up
ack_msg = "get-ack"
self.channel.basic_publish(
exchange='topic_logs',
routing_key=self.routing_key,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=ack_msg.encode("utf-8"))
self.connection.process_data_events(time_limit=2)
if self.response is None:
return "Worker failed with an error - No worker to pick the task".encode('utf-8')
print(self.response, "Response before message")
# Send the actual task oly if worker is alive
message = {"context": self.context,
"data": self.data}
self.channel.basic_publish(
exchange='topic_logs',
routing_key=self.routing_key,
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=json.dumps(message))
self.connection.process_data_events(time_limit=None)
return self.response
def publish_task_and_process_result(task_obj, context, data):
if data is not None: # if scraper task then data will be None
data = data.to_json()
task_publisher = TasksRpcClient(task_obj.task_name, context, data)
try:
data_bytes = task_publisher.call() # this will be a csv of bytes type
data = str(data_bytes.decode("utf-8"))
print(data, "******")
except Exception as e:
print(str(e), "&&&&&&&")
send_error_to_prefect_cloud(e)
print("data in prefect..", data)
exception_flag = False
if data.startswith("Worker failed with an error"):
print("found err msg", data)
send_error_to_prefect_cloud(Exception(data))
task_obj.status = "Failed"
task_obj.save()
exception_flag = True
return data, exception_flag