用户登录
用户注册

分享至

apache kafka 徐郡明

  • 作者: 自摸白板
  • 来源: 51数据库
  • 2020-09-20
kafka的consumer.properties的group.id到底有什么用,
在kafka分布式集群部署时,消费者的group.id,是否需要和consumer.properties配置的group.id一致。
我两个不同的topic,分别使用两个consumer消费。
其中一个consumer必须设置group.id和consumer.properties配置的group.id一致,才能消费消息。
另一个consumer必须设置group.id和consumer.properties配置的group.id不一致,才能消费消息。




《Apache Kafka源码剖析》以Kafka 0.10.0版本源码为基础,针对Kafka的架构设计到实现细节进行详细阐述。《Apache Kafka源码剖析》共5章,从Kafka的应用场景、源码环境搭建开始逐步深入,不仅介绍Kafka的核心概念,而且对Kafka生产者、消费者、服务端的源码进行深入的剖析,最后介绍Kafka常用的管理脚本实现,让读者不仅从宏观设计上了解Kafka,而且能够深入到Kafka的细节设计之中。
在源码分析的过程中,还穿插了笔者工作积累的经验和对Kafka设计的理解,希望读者可以举一反三,不仅知其然,而且知其所以然。



  要注意些注意事项于partitionconsumer
1. consumer比partition浪费kafka设计partition允许并发所consumer数要于partition数
2. consumer比partition少consumer应于partitions主要合理配consumer数partition数否则导致partition面数据取均匀
partiton数目consumer数目整数倍所partition数目重要比取24容易设定consumer数目
3. consumerpartition读数据保证数据间顺序性kafka保证partition数据序partition根据读顺序同
4. 增减consumerbrokerpartition导致rebalance所rebalanceconsumer应partition发变化
5. high-level接口获取数据候block
简单版
简单坑测试流程先produce些数据再用consumer读记加第句设置
初始offset默认非设置意思offset非何修offset默认largest即新所加配置读前produce数据且候再加smallest配置没用offset合再修需要手工或用工具改重置offset

properties props = new properties();
props.put("auto.offset.reset", "smallest"); //必须要加要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

consumerconfig conf = new consumerconfig(props);
consumerconnector consumer = kafka.consumer.consumer.createjavaconsumerconnector(conf);
string topic = "page_visits";
map topiccountmap = new hashmap();
topiccountmap.put(topic, new integer(1));
map> consumermap = consumer.createmessagestreams(topiccountmap);
list streams = consumermap.get(topic);

kafkastream stream = streams.get(0);
consumeriterator it = stream.iterator();
while (it.hasnext()){
system.out.println("message: " + new string(it.next().message()));
}

if (consumer != null) consumer.shutdown(); //其实执行面hasnextblock

用high-levelconsumer两给力工具
1. bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group pv
看前group offset状况比看pv状况3partition
group topic pid offset logsize lag owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键offsetlogsizelag
前读完所offset=logsize并且lag=0
2. bin/kafka-run-class.sh kafka.tools.updateoffsetsinzk earliest config/consumer.properties page_visits
3参数
[earliest | latest]表示offset置哪
consumer.properties 配置文件路径
topictopic名page_visits
我面pv group执行完操作再check group offset状况结
group topic pid offset logsize lag owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
看offset已经清0lag=logsize

底给原文线程consumer完整代码

import kafka.consumer.consumerconfig;
import kafka.consumer.kafkastream;
import kafka.javaapi.consumer.consumerconnector;

import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

public class consumergroupexample {
private final consumerconnector consumer;
private final string topic;
private executorservice executor;

public consumergroupexample(string a_zookeeper, string a_groupid, string a_topic) {
consumer = kafka.consumer.consumer.createjavaconsumerconnector( // 创建connector注意面conf配置
createconsumerconfig(a_zookeeper, a_groupid));
this.topic = a_topic;
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}

public void run(int a_numthreads) { // 创建并发consumers
map topiccountmap = new hashmap();
topiccountmap.put(topic, new integer(a_numthreads)); // 描述读取哪topic需要几线程读
map> consumermap = consumer.createmessagestreams(topiccountmap); // 创建streams
list streams = consumermap.get(topic); // 每线程应于kafkastream

// now launch all the threads
//
executor = executors.newfixedthreadpool(a_numthreads);

// now create an object to consume the messages
//
int threadnumber = 0;
for (final kafkastream stream : streams) {
executor.submit(new consumertest(stream, threadnumber)); // 启consumer thread
threadnumber++;
}
}

private static consumerconfig createconsumerconfig(string a_zookeeper, string a_groupid) {
properties props = new properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupid);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new consumerconfig(props);
}

public static void main(string[] args) {
string zookeeper = args[0];
string groupid = args[1];
string topic = args[2];
int threads = integer.parseint(args[3]);

consumergroupexample example = new consumergroupexample(zookeeper, groupid, topic);
example.run(threads);

try {
thread.sleep(10000);
} catch (interruptedexception ie) {

}
example.shutdown();
}
}

simpleconsumer
另种simpleconsumer名字起简单接口其实low-level consumer更复杂接口
参考
候用接口?
read a message multiple times
consume only a subset of the partitions in a topic in a process
manage transactions to make sure a message is processed once and only once

用接口代价即partition,broker,offset再透明需要自管理些并且要handle broker leader切换麻烦
所定要用别用
you must keep track of the offsets in your application to know where you left off consuming.
you must figure out which broker is the lead broker for a topic and partition
you must handle broker leader changes
使用simpleconsumer步骤:
find an active broker and find out which broker is the leader for your topic and partition
determine who the replica brokers are for your topic and partition
build the request defining what data you are interested in
fetch the data
identify and recover from leader changes
首先必须知道读哪topic哪partition
找负责该partitionbroker leader找存该partition副本broker
再者自写request并fetch数据
终要注意需要识别处理broker leader改变
软件
前端设计
程序设计
Java相关