分布式任务队列
1、概述
asynq
是用golang
基于redis
编写的分布式任务队列。旨在实现可扩展且易于上手。
项目地址:https://github.com/hibiken/asynq
ui
地址:https://github.com/hibiken/asynqmon
asynq
工作原理:
- 客户端将任务放入队列
- 服务器从队列中拉去任务并为每个任务启动一个工作协程
- 任务由多个worker同时处理
在go项目中选择asynq
的理由:
- 直接基于
redis
,一半项目都有redis
,而asynq
本身就是基于redis
,所以可以少维护一个中间件 - 支持消息队列,延迟队列,定时任务调度,满足中小型项目需求(业务量不大)
- 有
webui
界面,每个任务都可以暂停,归档,通过ui
可以查看到成功失败,监控
2、环境搭建
环境搭建,这里选择docker-compose构建asynq
+asynqmon
,构建完成之后,可以在浏览器访问localhost:8980
version: '3'
services:
rds:
image: redis:6.2.5
container_name: redis
ports:
- 36379:6379
environment:
# 时区上海 - Time zone Shanghai (Change if needed)
TZ: Asia/Shanghai
command: "redis-server --requirepass 111111 --appendonly yes"
privileged: true
restart: always
asynqmon:
image: hibiken/asynqmon:latest
container_name: asynqmon
ports:
- 8980:8080
command:
- '--redis-addr=rds:6379'
- '--redis-password=111111'
restart: always
depends_on:
- redis
构建成功,可以看到结果
3、使用案例
定义异步任务
package task
import (
"context"
"encoding/json"
"github.com/hibiken/asynq"
"log"
"time"
)
const OrderTask = "asynq:order_task"
// 异步任务携带的负载
type Payload struct {
Order string
Req int
}
func NewTask(payload *Payload) (*asynq.Task, error) {
log.Println("初始化任务")
data, _ := json.Marshal(payload)
return asynq.NewTask(OrderTask, data), nil
}
func HandleTask(ctx context.Context, t *asynq.Task) error {
log.Println("开始执行任务")
var payload Payload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return err
}
log.Printf("处理订单:%s", payload.Order)
time.Sleep(20 * time.Second)
return nil
}
生产者投递实时任务/延时任务
package main
import (
"asynqJob/task"
"fmt"
"log"
"time"
"github.com/hibiken/asynq"
)
func main() {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: "127.0.0.1:36379",
Password: "111111",
DB: 0,
})
defer client.Close()
for i := 0; i < 2; i++ {
if i/2 == 0 {
t, err := task.NewTask(&task.Payload{
Order: fmt.Sprintf("延迟订单订单编号:%s%d", time.Now().Format("20060102150405"), i+100),
Req: i,
})
if err != nil {
log.Println(err.Error())
}
// client.Enqueue(t,time.Now()) 实时任务
info, err := client.Enqueue(t, asynq.ProcessIn(1*time.Second))// 延时任务,1秒之后执行
if err != nil {
log.Println("投递失败", err)
} else {
log.Println("投递成功", info.ID)
}
}
}
}
服务端处理消息
package main
import (
"asynqJob/task"
"log"
"github.com/hibiken/asynq"
)
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: "127.0.0.1:36379",
Password: "111111",
DB: 0,
},
asynq.Config{Concurrency: 12,
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
}})
mux := asynq.NewServeMux()
mux.HandleFunc(task.OrderTask, task.HandleTask)
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server:%v", err)
}
}