-
Notifications
You must be signed in to change notification settings - Fork 6
/
kafkaProducer.py
40 lines (35 loc) · 1.31 KB
/
kafkaProducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import sys
import json
import time
import random
import datetime
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='master:6667')
kafkaTopicName = 'test'
if len(sys.argv) != 5:
print("Usage: [filename].py chSize, chCountMax, msgCount, msgRate(kbps)")
exit(-1)
msgRate = int(sys.argv[4])
byteSent = 0
startTime = datetime.datetime.now()
for _ in range(int(sys.argv[3])):
chDict = {}
chDict['time'] = int(time.time())
chTemp = {}
for i in range(10):
chName = "CH_" + str(random.randint(1, int(sys.argv[1])))
chVal = random.randint(0, int(sys.argv[2]))
chTemp[chName] = chVal
chDict['data'] = chTemp
result = json.dumps(chDict)
print(result)
producer.send(kafkaTopicName, result)
byteSent += len(result)
expectedTime = (byteSent * 8.0) / (msgRate * 1024)
elapsedTime = datetime.datetime.now() - startTime
while elapsedTime.total_seconds() < expectedTime:
time.sleep(0.001)
elapsedTime = datetime.datetime.now() - startTime
endTime = datetime.datetime.now()
totalTime = endTime - startTime
print("Run " + str(totalTime.total_seconds()) + "sec")