1;首先创建topic
./bin/kafka-topics.sh -zookeeper zk1:2181,zk2:2181,zk3:2181 -topic test -replication-factor 3 -partition 3 -create
2:查看topic
./bin/kafka-topics.sh -zookeeper zk1:2181,zk2:2181,zk3:2181 -list
3:然后执行如下代码
-
package com.xx.bigdata.kafkaTool;
-
-
import org.apache.kafka.clients.KafkaClient;
-
import org.apache.kafka.clients.producer.KafkaProducer;
-
import org.apache.kafka.clients.producer.ProducerConfig;
-
import org.apache.kafka.clients.producer.ProducerRecord;
-
-
import java.util.Properties;
-
import java.util.UUID;
-
-
public class Producer {
-
public static void main(String[] args) {
-
Properties properties = new Properties() {{
-
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "zk1:9092,zk2:9092,zk3:9092");
-
put(ProducerConfig.ACKS_CONFIG, "all");
-
put(ProducerConfig.RETRIES_CONFIG, 0);
-
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
-
put(ProducerConfig.LINGER_MS_CONFIG, 1);
-
put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
-
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
}};
-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
while (true) {
-
try {
-
Thread.sleep(500);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString(), UUID.randomUUID().toString()));
-
}
-
}
-
}
阅读(746) | 评论(0) | 转发(0) |