Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1971500
  • 博文数量: 333
  • 博客积分: 10161
  • 博客等级: 上将
  • 技术积分: 5238
  • 用 户 组: 普通用户
  • 注册时间: 2008-02-19 08:59
文章分类

全部博文(333)

文章存档

2017年(10)

2014年(2)

2013年(57)

2012年(64)

2011年(76)

2010年(84)

2009年(3)

2008年(37)

分类: 大数据

2017-09-28 19:36:54

from kafka import KafkaConsumer


# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))


# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)


# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))


# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)


# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)


# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')


# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')

阅读(5382) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~