Go操作kafka
队列,sarama
包
saram
是一个纯go
客户端,是目前github
上star
最多的一个包
包地址:https://github.com/Shopify/sarama
文档地址:https://pkg.go.dev/github.com/Shopify/sarama
生产者
package main
import (
"fmt"
"log"
"github.com/Shopify/sarama"
)
func main() {
log.Println("启动生产者")
var config = sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完消息需要leader和follow都确认,发送端确认
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
config.Producer.Return.Successes = true
var client, err = sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
log.Println("链接错误", err.Error())
return
}
defer client.Close()
for i := 0; i < 10; i++ {
var message = &sarama.ProducerMessage{} // 构造一个消息
message.Topic = "web_test"
message.Value = sarama.StringEncoder(fmt.Sprintf("hello kafka,id=%d", i))
// message.Partition = 0 // 指定partition
pid, offset, err := client.SendMessage(message)
if err != nil {
log.Println("消息投递失败,err:", err.Error())
}
log.Println(pid, offset)
}
}
消费者
普通订阅消费者,普通消费者中,一条消息会被所有的订阅者获取到,广播模式
package main
import (
"log"
"github.com/Shopify/sarama"
)
func main() {
var config = sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = true
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, config)
if err != nil {
log.Println("fail to connect consumer,err:", err.Error())
return
} else {
log.Println("连接成功")
}
partitionList, err := consumer.Partitions("web_test") // 根据topic获取到所有的分区
if err != nil {
log.Println("fail to get list of partition", err.Error())
return
}
log.Println("获取topic")
for part := range partitionList {
log.Println("监听分区")
// 针对每个分区创建一个对应的分区消费者
pc, err := consumer.ConsumePartition("web_test", int32(part), sarama.OffsetNewest)
if err != nil {
return
}
defer pc.AsyncClose()
go func(p sarama.PartitionConsumer, pt int) {
for msg := range p.Messages() {
log.Printf("处理消息:%s,partition:%d,offset:%d", string(msg.Value), pt, msg.Offset)
}
}(pc, part)
}
for {
select {}
}
}
消费者组,一条消息只能被同一个消费者组的一个客户端消费获取
package main
import (
"context"
"log"
"github.com/Shopify/sarama"
)
func main() {
var config = sarama.NewConfig()
var group, err = sarama.NewConsumerGroup([]string{"127.0.0.1:9092"}, "group_test", config)
if err != nil {
log.Println("group connect error")
return
}
defer group.Close()
log.Println("连接borken")
var consumer = &GroupConsumer{}
group.Consume(context.Background(), []string{"web_test"}, consumer)
for {
select {}
}
}
type GroupConsumer struct {
}
func (*GroupConsumer) Setup(session sarama.ConsumerGroupSession) error {
log.Println("setup")
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (*GroupConsumer) Cleanup(session sarama.ConsumerGroupSession) error {
log.Println("cleanup")
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (*GroupConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Println("处理消息:", string(message.Value))
sess.MarkMessage(message, "") // 标记消息为空
}
log.Println("process session")
return nil
}
以上就是golang
简单操作kafka
队列