main.go
package main
import (
"context"
"fmt"
"github.com/RichardKnop/machinery/v2"
//amqpbackend "github.com/RichardKnop/machinery/v2/backends/amqp"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
amqpbroker "github.com/RichardKnop/machinery/v2/brokers/amqp"
//redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"github.com/RichardKnop/machinery/v2/config"
"github.com/RichardKnop/machinery/v2/example/tracers"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/log"
"github.com/RichardKnop/machinery/v2/tasks"
"github.com/opentracing/opentracing-go"
"gotest/itasks"
"time"
)
func main() {
//执行后发送任务,并阻塞等待任务结果返回
if err := send();err!=nil{
fmt.Println(err.Error())
return
}
}
func startServer() (*machinery.Server, error) {
//cnf, err := config.NewFromYaml("./config.yml",false)
//if err != nil {
// fmt.Printf(err.Error())
// return nil, err
//}
cnf := &config.Config{
Broker: "amqp://admin:123456@127.0.0.1:31598/",
DefaultQueue: "machinery_tasks",
ResultBackend: "amqp://admin:123456@127.0.0.1:31598/",
AMQP: &config.AMQPConfig{
AutoDelete: false,
Exchange: "machinery_exchange",
ExchangeType: "direct",
BindingKey: "machinery_task",
},
}
broker:=amqpbroker.New(cnf)
//backend := amqpbackend.New(cnf)
//broker := redisbroker.NewGR(cnf, []string{"123456@127.0.0.1:36379"}, 10)
backend := redisbackend.NewGR(cnf, []string{"123456@127.0.0.1:36379"}, 10)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
// Register mytasks
tasksMap := map[string]interface{}{
"add": mytasks.Add,
}
return server, server.RegisterTasks(tasksMap)
}
func worker() error {
consumerTag := "machinery_worker"
cleanup, err := tracers.SetupTracer(consumerTag)
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
// The second argument is a consumer tag
// Ideally, each worker should have a unique tag (worker1, worker2 etc)
worker := server.NewWorker(consumerTag, 0)
// Here we inject some custom code for error handling,
// start and end of task hooks, useful for metrics for example.
errorHandler := func(err error) {
log.ERROR.Println("I am an error handler:", err)
}
preTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am a start of task handler for:", signature.Name)
}
postTaskHandler := func(signature *tasks.Signature) {
log.INFO.Println("I am an end of task handler for:", signature.Name)
}
worker.SetPostTaskHandler(postTaskHandler)
worker.SetErrorHandler(errorHandler)
worker.SetPreTaskHandler(preTaskHandler)
return worker.Launch()
}
func send() error {
cleanup, err := tracers.SetupTracer("sender")
if err != nil {
log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()
server, err := startServer()
if err != nil {
return err
}
var (
addTask0 tasks.Signature
)
var initTasks = func() {
data := mytasks.Data{ID: 1000, Msg: "消息内容"}
b,_:=data.MarshalBinary()
eta:=time.Now().UTC().Add(time.Second * 5)
addTask0 = tasks.Signature{
Name: "add",
RoutingKey: "",
ETA: &eta,
GroupUUID: "",
GroupTaskCount: 0,
RetryCount: 3,
Args: []tasks.Arg{
{
Type: "string",
Value: "IDK53434232",
},
{
Type: "string",
Value: "环行之家",
},
{
Type: "[]byte",
Value: b,
},
},
}
}
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
log.INFO.Println("Single task:")
initTasks()
//发送任务
_, err = server.SendTaskWithContext(ctx, &addTask0)
if err != nil {
return fmt.Errorf("Could not send task: %s", err.Error())
}
//taskSignature,err:=server.GetBroker().GetPendingTasks("asong")
//if err != nil {
// fmt.Println(err.Error())
//}
//
//fmt.Println(taskSignature)
results, err := asyncResult.Get(time.Millisecond * 5)
if err != nil {
return fmt.Errorf("Getting task result failed with error: %s", err.Error())
}
//
//taskState:=asyncResult.GetState()
//fmt.Println(taskState.IsSuccess())
//fmt.Println(taskState.IsCompleted())
//fmt.Println(taskState.IsFailure())
//阻塞,等待获取任务结果
log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results))
return nil
}
itasks/task.go
package mytasks
import (
"fmt"
"strconv"
"strings"
)
type Data struct {
Msg string
ID int
}
// 实现 BinaryMarshaler 接口
func (m *Data) MarshalBinary() ([]byte, error) {
return []byte(fmt.Sprintf("%s:%d", m.Msg, m.ID)), nil
}
// 实现 BinaryUnmarshaler 接口
func (m *Data) UnmarshalBinary(data []byte) error {
parts := strings.SplitN(string(data), ":", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid data format")
}
m.Msg = parts[0]
field2, err := strconv.Atoi(parts[1])
if err != nil {
return err
}
m.ID = field2
return nil
}
func Add(appId string,appName string,data []byte) (int64, error) {
fmt.Println(string(data))
fmt.Println(appId,appName)
var sum int64
for i := 0; i < 1000; i++ {
sum+=int64(i)
fmt.Println(sum)
}
return sum, nil
}
启动一个worker进程,将上方的main方法代码换成如下代码后启动服务,当有新任务时将会执行
func main() {
if err := worker(); err != nil {
fmt.Println(err.Error())
return
}
}
声明:
本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。