asynq分布式任务队列

程序员卷不动了 2023-02-28 PM 280℃ 0条

分布式任务队列

1、概述

asynq是用golang基于redis编写的分布式任务队列。旨在实现可扩展且易于上手。

项目地址:https://github.com/hibiken/asynq

ui地址:https://github.com/hibiken/asynqmon

asynq工作原理:

  • 客户端将任务放入队列
  • 服务器从队列中拉去任务并为每个任务启动一个工作协程
  • 任务由多个worker同时处理

asynq

在go项目中选择asynq的理由:

  • 直接基于redis,一半项目都有redis,而asynq本身就是基于redis,所以可以少维护一个中间件
  • 支持消息队列,延迟队列,定时任务调度,满足中小型项目需求(业务量不大)
  • webui界面,每个任务都可以暂停,归档,通过ui可以查看到成功失败,监控
2、环境搭建

环境搭建,这里选择docker-compose构建asynq+asynqmon,构建完成之后,可以在浏览器访问localhost:8980

version: '3'

services:
  rds:
    image: redis:6.2.5
    container_name: redis
    ports:
      - 36379:6379
    environment:
      # 时区上海 - Time zone Shanghai (Change if needed)
      TZ: Asia/Shanghai
    command: "redis-server --requirepass 111111  --appendonly yes"
    privileged: true
    restart: always
  
  asynqmon:
    image: hibiken/asynqmon:latest
    container_name: asynqmon
    ports:
      - 8980:8080
    command:
      - '--redis-addr=rds:6379'
      - '--redis-password=111111'
    restart: always
    depends_on:
      - redis

构建成功,可以看到结果

ui

3、使用案例

定义异步任务

package task

import (
    "context"
    "encoding/json"
    "github.com/hibiken/asynq"
    "log"
    "time"
)

const OrderTask = "asynq:order_task"

// 异步任务携带的负载
type Payload struct {
    Order string
    Req   int
}

func NewTask(payload *Payload) (*asynq.Task, error) {
    log.Println("初始化任务")
    data, _ := json.Marshal(payload)
    return asynq.NewTask(OrderTask, data), nil
}

func HandleTask(ctx context.Context, t *asynq.Task) error {

    log.Println("开始执行任务")
    var payload Payload
    if err := json.Unmarshal(t.Payload(), &payload); err != nil {
        return err
    }
    log.Printf("处理订单:%s", payload.Order)
    time.Sleep(20 * time.Second)
    return nil
}

生产者投递实时任务/延时任务

package main

import (
    "asynqJob/task"
    "fmt"
    "log"
    "time"

    "github.com/hibiken/asynq"
)

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{

        Addr:     "127.0.0.1:36379",
        Password: "111111",
        DB:       0,
    })

    defer client.Close()

    for i := 0; i < 2; i++ {
        if i/2 == 0 {
            t, err := task.NewTask(&task.Payload{
                Order: fmt.Sprintf("延迟订单订单编号:%s%d", time.Now().Format("20060102150405"), i+100),
                Req:   i,
            })
            if err != nil {
                log.Println(err.Error())
            }
                    // client.Enqueue(t,time.Now())  实时任务
            info, err := client.Enqueue(t, asynq.ProcessIn(1*time.Second))// 延时任务,1秒之后执行
            if err != nil {
                log.Println("投递失败", err)
            } else {
                log.Println("投递成功", info.ID)
            }
        }
    }
}

服务端处理消息

package main

import (
    "asynqJob/task"
    "log"

    "github.com/hibiken/asynq"
)

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{
            Addr:     "127.0.0.1:36379",
            Password: "111111",
            DB:       0,
        },
        asynq.Config{Concurrency: 12,
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            }})

    mux := asynq.NewServeMux()
    mux.HandleFunc(task.OrderTask, task.HandleTask)
    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server:%v", err)
    }
}
标签: redis, golang, 消息队列, asynq

非特殊说明,本博所有文章均为博主原创。

评论啦~