Chinaunix首页 | 论坛 | 博客
  • 博客访问: 153154
  • 博文数量: 130
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1302
  • 用 户 组: 普通用户
  • 注册时间: 2018-11-15 23:50
个人简介

.。。。。。。。

文章分类

全部博文(130)

文章存档

2022年(12)

2018年(118)

我的朋友

分类: Python/Ruby

2022-09-21 10:20:54

# 实现kafka消息分流
# 生产者将相同类型的消息放入指定的队列
# 消费者消费指定队列的数据

from multiprocessing import Process, JoinableQueue, cpu_count
import time

import json
from kafkaAPI import kafka_consumer as kafka_consumer
from cfg.config import config as cfg

def get_partition_que():
    que_dict = {}
    for partition_num in range(cpu_count()):
        que_dict.setdefault(partition_num, JoinableQueue(maxsize=10))
    return que_dict

def consumer(q):
    while True:
        res = q.get()
        print('consumer: %s' % (res.get("c_id", "")), q)
        q.task_done()  # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕
 
 
def producer(que_dict, csumer):
    for msg in csumer.consumer:
        message = json.loads(msg.value)
        if isinstance(message, dict):
            c_asset_id = message.get("c_id", "")
            q = que_dict.get(hash(c_asset_id) % 4)
            print("producer:", message.get("c_id", ""), q)
            q.put(message)
    q.join()


def test():
    que_dict = get_partition_que()
    producer_list = []
    for _ in range(len(que_dict)):
        producer_list.append(Process(target=producer, args=(que_dict,
        kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS))))

    # csumer1 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
    # csumer2 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
    # csumer3 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
    # csumer4 = kafka_consumer.GetKafkaConsumer(cfg.KAFKA_TOPICS)
    # # 生产者们
    # p1 = Process(target=producer, args=(que_dict, csumer1))
    # p2 = Process(target=producer, args=(que_dict, csumer2))
    # p3 = Process(target=producer, args=(que_dict, csumer3))
    # p4 = Process(target=producer, args=(que_dict, csumer4))

    consumer_list = []
    for i in range(len(que_dict)):
        c_list = []
        for _ in range(2):
            c = Process(target=consumer, args=(que_dict[i],))
            c.daemon = True
            c_list.append(c)
        consumer_list.extend(c_list)
    # 消费者们
    # c1 = Process(target=consumer, args=(que_dict[0],))
    # c2 = Process(target=consumer, args=(que_dict[1],))
    # c3 = Process(target=consumer, args=(que_dict[2],))
    # c4 = Process(target=consumer, args=(que_dict[3],))
    # c1.daemon = True
    # c2.daemon = True
    # c3.daemon = True
    # c4.daemon = True

    #开始
    # consumer_list=[c1, c2, c3, c4]
    print(producer_list)
    print(consumer_list)
    consumer_list.extend(producer_list)
    # print(producer_list)
    # print(consumer_list)
    for p in consumer_list:
        p.start()
 
    # p_list=[p1, p2, p3, p4]
    for pl in producer_list:
        pl.join()

    # 1、主进程等生产者p1,p2,p3结束
    # 2、而p1,p2,p3,是在消费者把所有数据都取干净之后才会结束
    # 3、所以一旦p1,p2,p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
    # print("主")

if __name__ == '__main__':
    test()
    # print(get_partition_que())
阅读(667) | 评论(0) | 转发(0) |
0

上一篇:docker镜像加速

下一篇:没有了

给主人留下些什么吧!~~