# 实现kafka消息分流
# 生产者将相同类型的消息放入指定的队列
# 消费者消费指定队列的数据
from multiprocessing import Process, JoinableQueue, cpu_count
import time
import json
from kafkaAPI import kafka_consumer as kafka_consumer
from cfg.config import config as cfg
def get_partition_que():
que_dict = {}
for partition_num in range(cpu_count()):
que_dict.setdefault(partition_num, JoinableQueue(maxsize=10))
return que_dict
def consumer(q):
while True:
res = q.get()
print('consumer: %s' % (res.get("c_id", "")), q)
q.task_done() # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕
def producer(que_dict, csumer):
for msg in csumer.consumer:
message = json.loads(msg.value)
if isinstance(message, dict):
c_asset_id = message.get("c_id", "")
q = que_dict.get(hash(c_asset_id) % 4)
print("producer:", message.get("c_id", ""), q)
q.put(message)
q.join()
def test():
que_dict = get_partition_que()
producer_list = []
for _ in range(len(que_dict)):
producer_list.append(Process(target=producer, args=(que_dict,
kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS))))
# csumer1 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
# csumer2 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
# csumer3 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
# csumer4 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
# # 生产者们
# p1 = Process(target=producer, args=(que_dict, csumer1))
# p2 = Process(target=producer, args=(que_dict, csumer2))
# p3 = Process(target=producer, args=(que_dict, csumer3))
# p4 = Process(target=producer, args=(que_dict, csumer4))
consumer_list = []
for i in range(len(que_dict)):
c_list = []
for _ in range(2):
c = Process(target=consumer, args=(que_dict[i],))
c.daemon = True
c_list.append(c)
consumer_list.extend(c_list)
# 消费者们
# c1 = Process(target=consumer, args=(que_dict[0],))
# c2 = Process(target=consumer, args=(que_dict[1],))
# c3 = Process(target=consumer, args=(que_dict[2],))
# c4 = Process(target=consumer, args=(que_dict[3],))
# c1.daemon = True
# c2.daemon = True
# c3.daemon = True
# c4.daemon = True
#开始
# consumer_list=[c1, c2, c3, c4]
print(producer_list)
print(consumer_list)
consumer_list.extend(producer_list)
# print(producer_list)
# print(consumer_list)
for p in consumer_list:
p.start()
# p_list=[p1, p2, p3, p4]
for pl in producer_list:
pl.join()
# 1、主进程等生产者p1,p2,p3结束
# 2、而p1,p2,p3,是在消费者把所有数据都取干净之后才会结束
# 3、所以一旦p1,p2,p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
# print("主")
if __name__ == '__main__':
test()
# print(get_partition_que())