golang连接kafka消费进ES操作
- 作者: 你是我遥不可及的梦i39507074
- 来源: 51数据库
- 2021-08-25
1.首先初始化conf配置把kafka和es的地址配置好还有一个日志方便查看
配置信息如下 用到的库是
github.com/astaxie/beego/config [logs] log_level = debug log_path = "./logs/log_transfer.log" [kafka] server_addr = 192.168.0.134:9092 topic = nginx_log [es] addr = http://192.168.0.134:9200/
2.读取conf配置存取进结构体
type logconfig struct {
kafkaaddr string
esaddr string
logpath string
loglevel string
topic string
}
var (
logconfig *logconfig
)
3.读取conf配置代码如下
func initconfig(conftype string,filename string)(err error) {
conf, err := config. newconfig(conftype,filename)
if err != nil {
fmt. println( "new config faild,err:",err)
return
}
logconfig = &logconfig{}
logconfig.loglevel = conf. string( "logs::log_level")
if len(logconfig.loglevel) == 0 {
logconfig.loglevel = "debug"
}
logconfig.logpath = conf. string( "logs::log_path")
if len(logconfig.logpath) == 0 {
logconfig.logpath = "./logs"
}
logconfig.kafkaaddr = conf. string( "kafka::server_addr")
if len(logconfig.kafkaaddr) == 0 {
err = fmt. errorf( "invalid kafka addr err")
return
}
logconfig.esaddr = conf. string( "es::addr")
if len(logconfig.esaddr) == 0 {
err = fmt. errorf( "invalid es addr err")
return
}
logconfig.topic = conf. string( "kafka::topic")
if len(logconfig.topic) == 0 {
err = fmt. errorf( "invalid topic addr err")
return
}
return
}
4.完成了initconfig的初始化
5.初始化initlogger
func convertloglevel(level string) int {
switch(level) {
case "debug":
return logs.leveldebug
case "warn":
return logs.levelwarn
case "info":
return logs.levelinfo
case "trace":
return logs.leveltrace
}
return logs.leveldebug
}
func initlogger(logpath string, loglevel string) (err error) {
config := make( map[ string] interface{})
config[ "filename"] = logpath
config[ "level"] = convertloglevel(loglevel)
configstr, err := json. marshal(config)
if err!= nil {
fmt. println( "marshal failed,err:",err)
return
}
logs. setlogger(logs.adapterfile, string(configstr))
return
}
6.初始化kafka
type kafkaclient struct {
client sarama.consumer
addr string
topic string
wg sync.waitgroup
}
var (
kafkaclient *kafkaclient
)
func initkafka(addr string,topic string)(err error) {
kafkaclient = &kafkaclient{}
consumer, err := sarama. newconsumer(strings. split(addr, ","), nil)
if err != nil {
logs. error( "failed to strat consumer :",err)
return
}
kafkaclient.client = consumer
kafkaclient.addr = addr
kafkaclient.topic = topic
return
}
7.初始化es
gopkg.in/olivere/elastic.v2 // 这个是操作es的库
type logmessage struct {
app string
topic string
message string
}
var (
esclient *elastic.client
)
func inites(addr string)(err error) {
client, err := elastic. newclient(elastic. setsniff( false),elastic. seturl(addr))
if err != nil {
fmt. println( "connect es error",err)
return
}
esclient = client
return
}
8.干活把kafka的数据写入es
github.com/shopify/sarama 这个是操作kafka的驱动库
func run()(err error) {
fmt. println( "run")
partitionlist, err := kafkaclient.client. partitions(kafkaclient.topic)
if err != nil {
logs. error( "ini failed ,err:%v",err)
fmt. printf( "ini failed ,err:%v",err)
return
}
for partition := range partitionlist {
fmt. println( "for进入")
pc, errret := kafkaclient.client. consumepartition(kafkaclient.topic, int32(partition),sarama.offsetnewest)
if errret != nil {
err = errret
logs. error( "failed to start consumer for partition %d: %s \n ",partition,err)
fmt. printf( "failed to start consumer for partition %d: %s \n ",partition,err)
return
}
defer pc. asyncclose()
fmt. println( "马上进入协程")
kafkaclient.wg. add( 1)
go func(pc sarama.partitionconsumer){
fmt. println( "进来了")
for msg := range pc. messages() {
fmt. println( "func执行")
logs. debug( "partition:%d,offset:%d,key:%s,value:%s",msg.partition,msg.offset, string(msg.key), string(msg.value))
//fmt.println()
err = sendtoes(kafkaclient.topic,msg.value)
if err != nil {
logs. warn( "send to es failed,err:%v",err)
}
}
kafkaclient.wg. done()
}(pc)
}
kafkaclient.wg. wait()
fmt. println( "协程执行完毕")
return
}
上面代码是读kafka消费数据通过sendtoes这个函数发送至es里面
sendtoes代码如下
func sendtoes(topic string,data [] byte) (err error) {
msg := &logmessage{}
msg.topic = topic
msg.message = string(data)
_, err = esclient. index().
index(topic).
type(topic).
bodyjson(msg).
do()
if err != nil {
return
}
return
}
index就是索引名称
index().type().bodyjson().do()这样的写法是链式执行操作
9.写完了基本操作后 再写一个模拟写入数据进kafka的数据 代码如下
func main() {
config := sarama. newconfig()
config.producer.requiredacks = sarama.waitforall
config.producer.partitioner = sarama.newrandompartitioner
config.producer.return.successes = true
client, err := sarama. newsyncproducer([] string{ "127.0.0.1:9092"}, config)
if err != nil {
fmt. println( "producer close,err:", err)
return
}
defer client. close()
var n int= 0
for {
n++
msg := &sarama.producermessage{}
msg.topic = "nginx_log"
msg.value = sarama. stringencoder( "this is a good test,hello maomaochong!!," + strconv. itoa(n))
pid, offset, err := client. sendmessage(msg)
if err != nil {
fmt. println( "send message failed,", err)
return
}
fmt. printf( "pid:%v offset:%v \n ", pid, offset)
time. sleep(time.second * 2)
}
}
这个就是生产者往kafka里面写入数据进去消费
10.我们启动我们的kafka 注意kafka依赖于zookeeper 先启动zk然后启动kafka
我这里用的是zookeeper-3.4.12网上有下载
启动zk

zk已经成功启动
11.启动kafka 我这里是kafka_2.11-1.1.0
.\bin\windows\kafka-server-start.bat .\config\server.properties

kafka已经跑起来了
12.把kafka消费测试端也启动
.\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181

消费端启动成功 一直等待数据进来消费
13.然后我们把es 和kib 都启动了

这是我们的es版本是5.5.1的 已经跑起来了 接着启动我们的kib
kib里面有个配置config下面的叫kibana.yml里面设置好es的地址和端口就处于监听es状态
启动kib有点慢 稍微等一下就好

此时启动好了kib
14.测试kib是否启动
默认地址是http://localhost:5601

进入成功 确定没问题
15.编译我们的代码 写数据进kafka

从上面看我们知道一个再写 一个再消费
16.编译运行我们把kafka写入进es里面的代码

运行了 这里就把kafka消费的数据 写入进es里面
17.我们看一下数据是否有 进入kib

我们看到有数据了 已经成功了~
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
