golang使用sarama包操作kafka队列

程序员卷不动了 2023-02-25 AM 323℃ 0条
Go操作kafka队列,sarama

saram是一个纯go客户端,是目前githubstar最多的一个包

包地址:https://github.com/Shopify/sarama

文档地址:https://pkg.go.dev/github.com/Shopify/sarama

生产者
package main

import (
    "fmt"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    log.Println("启动生产者")
    var config = sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完消息需要leader和follow都确认,发送端确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true

    var client, err = sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        log.Println("链接错误", err.Error())
        return
    }
    defer client.Close()
    for i := 0; i < 10; i++ {
        var message = &sarama.ProducerMessage{} // 构造一个消息
        message.Topic = "web_test"
        message.Value = sarama.StringEncoder(fmt.Sprintf("hello kafka,id=%d", i))
        // message.Partition = 0 // 指定partition
        pid, offset, err := client.SendMessage(message)
        if err != nil {
            log.Println("消息投递失败,err:", err.Error())
        }
        log.Println(pid, offset)
    }
}
消费者

普通订阅消费者,普通消费者中,一条消息会被所有的订阅者获取到,广播模式

package main

import (
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    var config = sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = true
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        log.Println("fail to connect consumer,err:", err.Error())
        return
    } else {
        log.Println("连接成功")
    }

    partitionList, err := consumer.Partitions("web_test") // 根据topic获取到所有的分区

    if err != nil {
        log.Println("fail to get list of partition", err.Error())
        return
    }
    log.Println("获取topic")
    for part := range partitionList {
        log.Println("监听分区")
        // 针对每个分区创建一个对应的分区消费者
        pc, err := consumer.ConsumePartition("web_test", int32(part), sarama.OffsetNewest)
        if err != nil {
            return
        }
        defer pc.AsyncClose()
        go func(p sarama.PartitionConsumer, pt int) {
            for msg := range p.Messages() {
                log.Printf("处理消息:%s,partition:%d,offset:%d", string(msg.Value), pt, msg.Offset)
            }
        }(pc, part)
    }
    for {
        select {}
    }
}

消费者组,一条消息只能被同一个消费者组的一个客户端消费获取

package main

import (
    "context"
    "log"
    "github.com/Shopify/sarama"
)

func main() {
    var config = sarama.NewConfig()
    var group, err = sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "group_test", config)
    if err != nil {
        log.Println("group connect error")
        return
    }
    defer group.Close()
    log.Println("连接borken")
    var consumer = &GroupConsumer{}
    group.Consume(context.Background(), []string{"web_test"}, consumer)

    for {
        select {}

    }
}

type GroupConsumer struct {
}

func (*GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
    log.Println("setup")
    return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (*GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
    log.Println("cleanup")
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (*GroupConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Println("处理消息:", string(message.Value))
        sess.MarkMessage(message, "") // 标记消息为空
    }
    log.Println("process session")
    return nil
}

以上就是golang简单操作kafka队列

标签: golang

非特殊说明,本博所有文章均为博主原创。

评论啦~