用户登录
用户注册

分享至

logrus hook输出日志到本地磁盘的操作

  • 作者: 亖呉?盀
  • 来源: 51数据库
  • 2021-09-21

logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!

言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。

首先

go get github.com/sirupsen/logrus

然后

logrus和go lib里面一样有6个等级,可以直接调用

logrus.debug("useful debugging information.")
logrus.info("something noteworthy happened!")
logrus.warn("you should probably take a look at this.")
logrus.error("something failed but i'm not quitting.")
logrus.fatal("bye.")  //log之后会调用os.exit(1)
logrus.panic("i'm bailing.")  //log之后会panic()

项目例子结构

main.go

package main

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "logt/logs"
)
func main() {
  //创建一个hook,将日志存储路径输入进去
 hook := logs.newhook("d:/log/golog.log")
 //加载hook之前打印日志
 logrus.withfield("file", "d:/log/golog.log").info("new logrus hook err.")
 logrus.addhook(hook)
 //加载hook之后打印日志
 logrus.withfields(logrus.fields{
 "animal": "walrus",
 }).info("a walrus appears")
}

hook.go

不要看下面三个go文件代码很长,其实大多数都是固定代码,也就newhook函数自己扩展定义就好

package logs

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "os"
 "strings"
)

// hook 写文件的logrus hook
type hook struct {
 w loggerinterface
}

func newhook(file string) (f *hook) {
 w := newfilewriter()
 config := fmt.sprintf(`{"filename":"%s","maxdays":7}`, file)
 err := w.init(config)
 if err != nil {
 return nil
 }

 return &hook{w}
}

// fire 实现hook的fire接口
func (hook *hook) fire(entry *logrus.entry) (err error) {
 message, err := getmessage(entry)
 if err != nil {
 fmt.fprintf(os.stderr, "unable to read entry, %v", err)
 return err
 }
 switch entry.level {
 case logrus.paniclevel:
 fallthrough
 case logrus.fatallevel:
 fallthrough
 case logrus.errorlevel:
 return hook.w.writemsg(fmt.sprintf("[error] %s", message), levelerror)
 case logrus.warnlevel:
 return hook.w.writemsg(fmt.sprintf("[warn] %s", message), levelwarn)
 case logrus.infolevel:
 return hook.w.writemsg(fmt.sprintf("[info] %s", message), levelinfo)
 case logrus.debuglevel:
 return hook.w.writemsg(fmt.sprintf("[debug] %s", message), leveldebug)
 default:
 return nil
 }
}

// levels 实现hook的levels接口
func (hook *hook) levels() []logrus.level {
 return []logrus.level{
 logrus.paniclevel,
 logrus.fatallevel,
 logrus.errorlevel,
 logrus.warnlevel,
 logrus.infolevel,
 logrus.debuglevel,
 }
}

func getmessage(entry *logrus.entry) (message string, err error) {
 message = message + fmt.sprintf("%s ", entry.message)
 file, linenumber := getcallerignoringlogmulti(2)
 if file != "" {
 sep := fmt.sprintf("%s/src/", os.getenv("gopath"))
 filename := strings.split(file, sep)
 if len(filename) >= 2 {
  file = filename[1]
 }
 }
 message = fmt.sprintf("%s:%d ", file, linenumber) + message

 for k, v := range entry.data {
 message = message + fmt.sprintf("%v:%v ", k, v)
 }
 return
}

caller.go

package logs

import (
 "runtime"
 "strings"
)

func getcaller(calldepth int, suffixestoignore ...string) (file string, line int) {
 // bump by 1 to ignore the getcaller (this) stackframe
 calldepth++
outer:
 for {
 var ok bool
 _, file, line, ok = runtime.caller(calldepth)
 if !ok {
  file = "???"
  line = 0
  break
 }

 for _, s := range suffixestoignore {
  if strings.hassuffix(file, s) {
  calldepth++
  continue outer
  }
 }
 break
 }
 return
}

// getcallerignoringlogmulti todo
func getcallerignoringlogmulti(calldepth int) (string, int) {
 // the +1 is to ignore this (getcallerignoringlogmulti) frame
 return getcaller(calldepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
}

file.go

package logs

import (
 "encoding/json"
 "errors"
 "fmt"
 "io/ioutil"
 "log"
 "os"
 "path/filepath"
 "strings"
 "sync"
 "time"
)

// rfc5424 log message levels.
const (
 levelerror = iota
 levelwarn
 levelinfo
 leveldebug
)

// loggerinterface logger接口
type loggerinterface interface {
 init(config string) error
 writemsg(msg string, level int) error
 destroy()
 flush()
}

// logwriter implements loggerinterface.
// it writes messages by lines limit, file size limit, or time frequency.
type logwriter struct {
 *log.logger
 mw *muxwriter
 // the opened file
 filename string `json:"filename"`

 maxlines     int `json:"maxlines"`
 maxlinescurlines int

 // rotate at size
 maxsize    int `json:"maxsize"`
 maxsizecursize int

 // rotate daily
 daily     bool `json:"daily"`
 maxdays    int64 `json:"maxdays"`
 dailyopendate int

 rotate bool `json:"rotate"`

 startlock sync.mutex // only one log can write to the file

 level int `json:"level"`
}

// muxwriter an *os.file writer with locker.
type muxwriter struct {
 sync.mutex
 fd *os.file
}

// write to os.file.
func (l *muxwriter) write(b []byte) (int, error) {
 l.lock()
 defer l.unlock()
 return l.fd.write(b)
}

// setfd set os.file in writer.
func (l *muxwriter) setfd(fd *os.file) {
 if l.fd != nil {
 _ = l.fd.close()
 }
 l.fd = fd
}

// newfilewriter create a filelogwriter returning as loggerinterface.
func newfilewriter() loggerinterface {
 w := &logwriter{
 filename: "",
 maxlines: 1000000,
 maxsize: 1 << 28, //256 mb
 daily:  true,
 maxdays: 7,
 rotate:  true,
 level:  leveldebug,
 }
 // use muxwriter instead direct use os.file for lock write when rotate
 w.mw = new(muxwriter)
 // set muxwriter as logger's io.writer
 w.logger = log.new(w.mw, "", log.ldate|log.ltime)
 return w
}

// init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",
// "maxlines":10000,
// "maxsize":1<<30,
// "daily":true,
// "maxdays":15,
// "rotate":true
// }
func (w *logwriter) init(jsonconfig string) error {
 err := json.unmarshal([]byte(jsonconfig), w)
 if err != nil {
 return err
 }
 if len(w.filename) == 0 {
 return errors.new("jsonconfig must have filename")
 }
 err = w.startlogger()
 return err
}

// start file logger. create log file and set to locker-inside file writer.
func (w *logwriter) startlogger() error {
 fd, err := w.createlogfile()
 if err != nil {
 return err
 }
 w.mw.setfd(fd)
 err = w.initfd()
 if err != nil {
 return err
 }
 return nil
}

func (w *logwriter) docheck(size int) {
 w.startlock.lock()
 defer w.startlock.unlock()
 if w.rotate && ((w.maxlines > 0 && w.maxlinescurlines >= w.maxlines) ||
 (w.maxsize > 0 && w.maxsizecursize >= w.maxsize) ||
 (w.daily && time.now().day() != w.dailyopendate)) {
 if err := w.dorotate(); err != nil {
  fmt.fprintf(os.stderr, "filelogwriter(%q): %s\n", w.filename, err)
  return
 }
 }
 w.maxlinescurlines++
 w.maxsizecursize += size
}

// writemsg write logger message into file.
func (w *logwriter) writemsg(msg string, level int) error {
 if level > w.level {
 return nil
 }
 n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [t] "
 w.docheck(n)
 w.logger.print(msg)
 return nil
}

func (w *logwriter) createlogfile() (*os.file, error) {
 // open the log file
 fd, err := os.openfile(w.filename, os.o_wronly|os.o_append|os.o_create, 0660)
 return fd, err
}

func (w *logwriter) initfd() error {
 fd := w.mw.fd
 finfo, err := fd.stat()
 if err != nil {
 return fmt.errorf("get stat err: %s", err)
 }
 w.maxsizecursize = int(finfo.size())
 w.dailyopendate = time.now().day()
 if finfo.size() > 0 {
 content, err := ioutil.readfile(w.filename)
 if err != nil {
  return err
 }
 w.maxlinescurlines = len(strings.split(string(content), "\n"))
 } else {
 w.maxlinescurlines = 0
 }
 return nil
}

// dorotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *logwriter) dorotate() error {
 _, err := os.lstat(w.filename)
 if err == nil { // file exists
 // find the next available number
 num := 1
 fname := ""
 for ; err == nil && num <= 999; num++ {
  fname = w.filename + fmt.sprintf(".%s.%03d", time.now().format("2006-01-02"), num)
  _, err = os.lstat(fname)
 }
 // return error if the last file checked still existed
 if err == nil {
  return fmt.errorf("rotate: cannot find free log number to rename %s", w.filename)
 }

 // block logger's io.writer
 w.mw.lock()
 defer w.mw.unlock()

 fd := w.mw.fd
 _ = fd.close()

 // close fd before rename
 // rename the file to its newfound home
 err = os.rename(w.filename, fname)
 if err != nil {
  return fmt.errorf("rotate: %s", err)
 }

 // re-start logger
 err = w.startlogger()
 if err != nil {
  return fmt.errorf("rotate startlogger: %s", err)
 }

 go w.deleteoldlog()
 }

 return nil
}

func (w *logwriter) deleteoldlog() {
 dir := filepath.dir(w.filename)
 _ = filepath.walk(dir, func(path string, info os.fileinfo, err error) (returnerr error) {
 defer func() {
  if r := recover(); r != nil {
  returnerr = fmt.errorf("unable to delete old log '%s', error: %+v", path, r)
  fmt.println(returnerr)
  }
 }()

 if !info.isdir() && info.modtime().unix() < (time.now().unix()-60*60*24*w.maxdays) {
  if strings.hasprefix(filepath.base(path), filepath.base(w.filename)) {
  _ = os.remove(path)
  }
 }
 return
 })
}

// destroy destroy file logger, close file writer.
func (w *logwriter) destroy() {
 _ = w.mw.fd.close()
}

// flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *logwriter) flush() {
 _ = w.mw.fd.sync()
}

补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook

logrus hook 分析

logrus hook 接口定义很简单。如下

package logrus

// a hook to be fired when logging on the logging levels returned from
// `levels()` on your implementation of the interface. note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `levels()` to block.
type hook interface {
 levels() []level
 fire(*entry) error
}

// internal type for storing the hooks on a logger instance.
type levelhooks map[level][]hook

// add a hook to an instance of logger. this is called with
// `log.hooks.add(new(myhook))` where `myhook` implements the `hook` interface.
func (hooks levelhooks) add(hook hook) {
 for _, level := range hook.levels() {
 hooks[level] = append(hooks[level], hook)
 }
}

// fire all the hooks for the passed level. used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks levelhooks) fire(level level, entry *entry) error {
 for _, hook := range hooks[level] {
 if err := hook.fire(entry); err != nil {
  return err
 }
 }
 return nil
}

只需实现 该结构的接口。

type hook interface {
 levels() []level
 fire(*entry) error
}

就会被logrus框架遍历调用已注册的 hook 的 fire 方法

获取日志实例

// log_hook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/util/constant"
 "os"
)

//自实现 logrus hook
func getlogger(module string) *logrus.logger {
 //实例化
 logger := logrus.new()
 //设置输出
 logger.out = os.stdout
 //设置日志级别
 logger.setlevel(logrus.debuglevel)
 //设置日志格式
 //自定writer就行, hook 交给 lfshook
 logger.addhook(newlogrushook(constant.getlogpath(), module))
 
 logger.setformatter(&logrus.jsonformatter{
 timestampformat:"2006-01-02 15:04:05",
 })
 return logger
}

//确保每次调用使用的文件都是唯一的。
func getnewfieldloggercontext(module,appfield string) *logrus.entry {
 logger:= getlogger(module)
 return logger.withfields(logrus.fields{
 "app": appfield,
 })
}

//订阅 警告日志
func subscribelog(entry *logrus.entry, submap subscribemap) {
 logger := entry.logger
 logger.addhook(newsubscribehook(submap))
 fmt.println("日志订阅成功")
}

constant.getlogpath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。

日志切片hook

代码

// writer.go
package logger

import (
 "fmt"
 "github.com/pkg/errors"
 "io"
 "library/util"
 "os"
 "path/filepath"
 "sync"
 "time"
)

type logwriter struct {
 logdir       string //日志根目录地址。
 module       string //模块 名
  curfilename   string //当前被指定的filename
 curbasefilename   string //在使用中的file
 turncateduration  time.duration
 mutex      sync.rwmutex
 outfh      *os.file
}

func (w *logwriter) write(p []byte) (n int, err error) {
 w.mutex.lock()
 defer w.mutex.unlock()
 if out, err:= w.getwriter(); err!=nil {
 return 0, errors.new("failed to fetch target io.writer")
 }else{
 return out.write(p)
 }
}

func (w *logwriter) getfilename() string {
 base := time.now().truncate(w.turncateduration)
 return fmt.sprintf("%s/%s/%s_%s", w.logdir, base.format("2006-01-02"), w.module, base.format("15"))
}

func (w *logwriter) getwriter()(io.writer, error) {
 filename := w.curbasefilename
 //判断是否有新的文件名
 //会出现新的文件名
 basefilename := w.getfilename()
 if basefilename != filename {
 filename = basefilename
 }

 dirname := filepath.dir(filename)
 if err := os.mkdirall(dirname, 0755); err != nil {
 return nil, errors.wrapf(err, "failed to create directory %s", dirname)
 }

 filehandler, err := os.openfile(filename, os.o_create|os.o_append|os.o_wronly, 0644)
 if err != nil {
 return nil, errors.errorf("failed to open file %s", err)
 }
 w.outfh.close()
 w.outfh = filehandler
 w.curbasefilename = filename
 w.curfilename = filename

 return filehandler, nil
}

func new(logpath, module string, duration time.duration) *logwriter {
 return &logwriter{
 logdir: logpath,
 module: module,
 turncateduration:duration,
 curfilename: "",
 curbasefilename: "",
 }
}
// hook.go
package logger

import (
 "github.com/rifflock/lfshook"
 "github.com/sirupsen/logrus"
 "time"
)
func newlogrushook(logpath, moduel string) logrus.hook {
 logrus.setlevel(logrus.warnlevel)

 writer := new(logpath, moduel, time.hour * 2)

 lfshook := lfshook.newhook(lfshook.writermap{
 logrus.debuglevel: writer,
 logrus.infolevel: writer,
 logrus.warnlevel: writer,
 logrus.errorlevel: writer,
 logrus.fatallevel: writer,
 logrus.paniclevel: writer,
 }, &logrus.textformatter{disablecolors: true})

 // writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
 // writer 是实现了writer 接口的库,在日志调用write是做预处理
 return lfshook
}

测试代码

func testgetlogger(t *testing.t) {
 lg := getnewfieldloggercontext("test","d")
 lg.logger.info("????")
}

解析

logger实例持有了 自定义的 io.writer 结构体,在消费fire函数时,会调用write方法,此时通过truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。

注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。

邮件警报hook

代码

// subscribehook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/email"
 "strings"
)

type subscribemap map[logrus.level][]*email.receiver
type subscribehook struct {
 submap subscribemap
}
//此处可以自实现hook 目前使用三方hook
func(h *subscribehook)levels() []logrus.level{
 return logrus.alllevels
}

func(h *subscribehook)fire(entry *logrus.entry) error{
 for level, receivers := range h.submap {
 //命中 准备消费
 if level == entry.level {
  if len(receivers) > 0 {
  email.sendemail(receivers, fmt.sprintf("%s:[系统日志警报]", entry.level.string()),
   fmt.sprintf("错误内容: %s",entry.message))
  }
 }
 }
 return nil
}
func newsubscribemap(level logrus.level, receiverstr string) subscribemap{
 submap := subscribemap{}
 addresslist := strings.split(receiverstr,";")
 var receivers []*email.receiver
 for _, address := range addresslist {
 receivers = append(receivers, &email.receiver{email: address})
 }
 submap[level] = receivers
 return submap
}
func newsubscribehook(submap subscribemap) *subscribehook {
 return &subscribehook{submap}
// email.go
package email

import (
 "fmt"
 "gopkg.in/gomail.v2"
 "regexp"
 "strconv"
)

type sender struct {
 user   string
 password string
 host   string
 port   int
 mailto  []string
 subject  string
 content  string
}

type receiver struct {
 email  string
}

func (r *receiver) check() bool {
 pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
 reg := regexp.mustcompile(pattern)
 return reg.matchstring(r.email)
}

func (s *sender) clean (){

}

//检查 邮箱正确性
func (s *sender)newreceiver(email string) *receiver {
 rec := &receiver{email:email}
 if rec.check() {
 m.mailto = []string{email}
 return rec
 }else{
 fmt.printf("email check fail 【%s】\n", email)
 return nil
 }
}
func (s *sender)newreceivers(receivers []*receiver) {
 for _, rec := range receivers {
 if rec.check() {
  m.mailto = append(m.mailto, rec.email)
 }else{
  fmt.printf("email check fail 【%s】\n", rec.email)
 }
 }
}
// 163邮箱 password 为开启smtp后给的秘钥
var m = sender{user:"6666666@163.com", password:"666666666", host: "smtp.163.com", port: 465}

func sendemail(receivers []*receiver,subject, content string){
 m.newreceivers(receivers)
 m.subject = subject
 m.content = content

 e := gomail.newmessage()
 e.setheader("from", e.formataddress(m.user, "hengsheng"))
 e.setheader("to", m.mailto...)  //发送给多个用户
 e.setheader("subject", m.subject) //设置邮件主题
 e.setbody("text/html", m.content)  //设置邮件正文
 d := gomail.newdialer(m.host, m.port, m.user, m.password)
 err := d.dialandsend(e)
 if err != nil {
 fmt.printf("error 邮件发送错误! %s \n", err.error())
 }
}

使用

同理在writer时 如果是错误日志则发送邮件。

o.logger = logger.getnewfieldloggercontext("test", "666")
if subscribesocket {
 logger.subscribelog(o.logger, logger.newsubscribemap(logrus.errorlevel, "a@163.com;b@163.com"))
 }
 // o 为实际结构体实例

kafkahook

// kafka hook
package logger

import (
 "github.com/sirupsen/logrus"
 "library/kafka"
 "library/util/constant"
)

type kafkahook struct {
 kafkaproducer  *kafka.kafkaproducer
}


func(h *kafkahook)levels() []logrus.level{
 return logrus.alllevels
}

func(h *kafkahook)fire(entry *logrus.entry) error{
 h.kafkaproducer.sendmsgsync(entry.message)
 return nil
}

func newkafkahook() *kafkahook{
 producer := kafka.newkafkaproducer(constant.kafkalogelktopic,true)
 return &kafkahook{kafkaproducer: producer}
}

使用时logger.addhook(newkafkahook()) 即可

kafka模块

生产者

// kafkaproducer.go
package kafka

import (
 "errors"
 "fmt"
 "github.com/shopify/sarama"
 "library/util/constant"
 "log"
 "time"
)

func getkafkaaddress()[]string{
 return "127.0.0.1:9092"
}

//同步消息模式
func syncproducer(topic, message string) error {
 config := sarama.newconfig()
 config.producer.return.successes = true
 config.producer.timeout = 5 * time.second
 p, err := sarama.newsyncproducer(getkafkaaddress(), config)
 if err != nil {
 return errors.new(fmt.sprintf("sarama.newsyncproducer err, message=%s \n", err))
 }
 defer p.close()
 msg := &sarama.producermessage{
 topic: topic,
 value: sarama.byteencoder(message),
 }
 part, offset, err := p.sendmessage(msg)
 if err != nil {
 return errors.new(fmt.sprintf("send sdsds err=%s \n", err))
 } else {
 fmt.printf("发送成功,partition=%d, offset=%d \n", part, offset)
 return nil
 }
}

//async 异步生产者
type kafkaproducer struct {
 topic    string
 asyncproducer  *sarama.asyncproducer
 syncproducer  *sarama.syncproducer
 sync    bool
}

func newkafkaproducer(topic string, sync bool) *kafkaproducer {
 k := &kafkaproducer{
 topic:   topic,
 sync:   sync,
 }
 if sync {
 k.initsync()
 }else{
 k.initasync()
 }
 return k
}

func (k *kafkaproducer) initasync() bool {
 if k.sync {
 fmt.printf("sync producer cant call async func !\n")
 return false
 }
 config := sarama.newconfig()
 //等待服务器所有副本都保存成功后的响应
 config.producer.requiredacks = sarama.waitforall
 //随机向partition发送消息
 config.producer.partitioner = sarama.newrandompartitioner
 //是否等待成功和失败后的响应,只有上面的requireacks设置不是noreponse这里才有用.
 config.producer.return.successes = true
 config.producer.return.errors = true
 //设置使用的kafka版本,如果低于v0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
 config.version = sarama.v0_10_0_1

 producer, e := sarama.newasyncproducer(getkafkaaddress(), config)
 if e != nil {
 fmt.println(e)
 return false
 }
 k.asyncproducer = &producer
 defer producer.asyncclose()
 pd := *k.asyncproducer
 go func() {
 for{
  select {
  case <-pd.successes():
  //fmt.println("offset: ", suc.offset, "timestamp: ", suc.timestamp.string(), "partitions: ", suc.partition)
  case fail := <-pd.errors():
  fmt.printf("err: %s \n", fail.err.error())
  }
 }
 }()

 return true
}

func (k *kafkaproducer) initsync() bool {
 if !k.sync {
 fmt.println("async producer cant call sync func !")
 return false
 }

 config := sarama.newconfig()
 config.producer.return.successes = true
 config.producer.timeout = 5 * time.second
 p, err := sarama.newsyncproducer(getkafkaaddress(), config)
 k.syncproducer = &p
 if err != nil {
 log.printf("sarama.newsyncproducer err, message=%s \n", err)
 return false
 }
 return true
}

func (k *kafkaproducer) sendmsgasync(sendstr string) {

 msg := &sarama.producermessage{
 topic: k.topic,
 }

 //将字符串转化为字节数组
 msg.value = sarama.byteencoder(sendstr)
 //fmt.println(value)

 //使用通道发送
 pd := *k.asyncproducer
 pd.input() <- msg
}

func (k *kafkaproducer) sendmsgsync(sendstr string) bool {
 msg := &sarama.producermessage{
 topic: k.topic,
 value: sarama.byteencoder(sendstr),
 }
 pd := *k.syncproducer
 part, offset, err := pd.sendmessage(msg)
 if err != nil {
 fmt.printf("发送失败 send message(%s) err=%s \n", sendstr, err)
 return false
 } else {
 fmt.printf("发送成功 partition=%d, offset=%d \n", part, offset)
 return true
 }
}

调用 sendmsgsync 或 sendmsgasync 生产消息,注意初始化时的参数要保证一致!

消费者组

// kafkaconsumergroup.go

package kafka

import (
 "context"
 "fmt"
 "github.com/shopify/sarama"
 "log"
 "sync"
)

func newkafkaconsumergroup(topics []string, group string, businesscall func(message *sarama.consumermessage) bool) *kafkaconsumergroup {
 k := &kafkaconsumergroup{
 brokers:  getkafkaaddress(),
 topics:  topics,
 group:       group,
 channelbuffersize: 2,
 ready:       make(chan bool),
 version:  "1.1.1",
 handler:  businesscall,
 }
 k.init()
 return k
}

// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// consumer 消费,但可以被多个 consumer group 消费
type kafkaconsumergroup struct {
 //代理(broker): 一台kafka服务器称之为一个broker
 brokers   []string
 //主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
 topics    []string
 version   string
 ready       chan bool
 group       string
 channelbuffersize  int
 //业务调用
 handler     func(message *sarama.consumermessage) bool
}

func (k *kafkaconsumergroup)init() func() {

 version,err := sarama.parsekafkaversion(k.version)
 if err!=nil{
 fmt.printf("error parsing kafka version: %v", err)
 }
 cfg := sarama.newconfig()
 cfg.version = version
 // 分区分配策略
 cfg.consumer.group.rebalance.strategy = sarama.balancestrategyrange
 // 未找到组消费位移的时候从哪边开始消费
 cfg.consumer.offsets.initial = -2
 // channel长度
 cfg.channelbuffersize = k.channelbuffersize
 ctx, cancel := context.withcancel(context.background())
 client, err := sarama.newconsumergroup(k.brokers, k.group, cfg)
 if err != nil {
  fmt.printf("error creating consumer group client: %v", err)
 }

 wg := &sync.waitgroup{}
 wg.add(1)
 go func() {
  defer func() {
  wg.done()
  //util.handlepanic("client.consume panic", log.standardlogger())
  }()
  for {
  if err := client.consume(ctx, k.topics, k); err != nil {
   log.printf("error from consumer: %v", err)
  }
  // check if context was cancelled, signaling that the consumer should stop
  if ctx.err() != nil {
   log.println(ctx.err())
   return
  }
  k.ready = make(chan bool)
  }
 }()

 <-k.ready
 fmt.printf("sarama consumer up and running!... \n")
 // 保证在系统退出时,通道里面的消息被消费
 return func() {
  cancel()
  wg.wait()
  if err = client.close(); err != nil {
  fmt.printf("error closing client: %v \n", err)
  }
 }

}

// setup is run at the beginning of a new session, before consumeclaim
func (k *kafkaconsumergroup) setup(sarama.consumergroupsession) error {
 // mark the consumer as ready
 close(k.ready)
 return nil
}

// cleanup is run at the end of a session, once all consumeclaim goroutines have exited
func (k *kafkaconsumergroup) cleanup(sarama.consumergroupsession) error {
 return nil
}

// consumeclaim must start a consumer loop of consumergroupclaim's messages().
func (k *kafkaconsumergroup) consumeclaim(session sarama.consumergroupsession, claim sarama.consumergroupclaim) error {

 // note:
 // do not move the code below to a goroutine.
 // the `consumeclaim` itself is called within a goroutine, see:
 // http://www.51sjk.com/Upload/Articles/1/0/251/251882_20210626002010547.go#l27-l29
 // 具体消费消息
 for message := range claim.messages() {
 //msg := string(message.value)
 //k.logger.infof("卡夫卡: %s", msg)

 if ok:= k.handler(message); ok {
  // 更新位移
  session.markmessage(message, "")
 }
 //run.run(msg)
 }
 return nil
}

测试代码

func testkafkaconsumergroup_init(t *testing.t) {
 //pd := newkafkaproducer("test-fail",true)
 //pd.initsync()
 k := newkafkaconsumergroup([]string{constant.kafkaalisdktopic}, "group-2", func(message *sarama.consumermessage) bool {
 fmt.println(string(message.value))
 //如果失败的处理逻辑
 //if ok := pd.sendmsgsync("666666"); ok {
 // return true
 //}
 return false

 })
 consumerdone := k.init()

 sigterm := make(chan os.signal, 1)
 signal.notify(sigterm, syscall.sigint, syscall.sigterm)
 select {
 case <-sigterm:
 fmt.println("terminating: via signal")
 }
 consumerdone()
}

这里有一些补偿逻辑在里面。

以上就是logrus相关hook。

好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。

软件
前端设计
程序设计
Java相关