I started playing with Go almost a year ago but never really managed to dive deeper or do anything serious with it. Recently picked it up again, reading and trying out bits of code on and off. Also started this new blog with Hugo (which is written in Go as well). As a language, Go is simple yet performant. I am definitely going to build a few micro services with Go soon.

Having said that, I was wondering what I could use to build a distributed task processing system. What I wanted is something similar to Celery in the Python land. Luckily, I found machinery which is inspired by Celery and has nice APIs to achieve similar results. In this blog post, I am going to demonstrate a simple example.

The source code is available here: masnun/golang-distributed-task-processing

Getting started

Here’s what we’re going to do:

  • There will be at least one worker which will be running and waiting for tasks
  • We will be sending task request from another process
  • We will be using Redis as the message queue
  • Ideally the setup would be distributed, that is the worker might run in a separate machine. But for this example, I will run both the worker and the task sender on the same machine.

Get the dependencies

We need to install machinery first. I am using Glide for dependency management in this project. But that is not compulsory. go get should work fine. So first, let’s install machinery -

go get github.com/RichardKnop/machinery/v1

Writing Task and worker

Workers are processes which keep running, waiting for task requests. Tasks are functions which can be requested and then the workers execute those functions and return the results.

Say we have a task named Say. From some other processes, we would request that the Say task be executed. The worker that will receive the request will find which function is registered as the Say task and then call the function with the received arguments. The result from the function is then stored and can be retrieved by the other parties.

So we first need to write a simple task. We will be writing a function named Say which will accept a name and say hello. So let’s create a directory named worker and inside create a file named hello.go. In the file, let’s define this function:

package main

// Say "Hello World"
func Say(name string) (string, error) {
	return "Hello " + name + "!", nil
}

Please note the function signature. The function must return error as the second return value. Otherwise the library will have issues.

In our case, we will be building a single executable from the worker code. So the package is called main. Now that we have a function, let’s write the worker. Create a file named main.go and put the following contents:

package main

import (
	machinery "github.com/RichardKnop/machinery/v1"
	"github.com/RichardKnop/machinery/v1/config"
	"github.com/RichardKnop/machinery/v1/errors"
)

func main() {

	var cnf = config.Config{
		Broker:        "redis://127.0.0.1:6379",
		ResultBackend: "redis://127.0.0.1:6379",
	}

	server, err := machinery.NewServer(&cnf)
	if err != nil {
		errors.Fail(err, "Could not create server")
	}

	server.RegisterTask("Say", Say)

	worker := server.NewWorker("worker-1")
	err = worker.Launch()
	if err != nil {
		errors.Fail(err, "Could not launch worker!")
	}

}

The code is quite simple. We create a config object by passing the Broker and ResultBackend values. We are using Redis here and the redis server is running on our machine. Please make sure the redis server is up and running on that address. Otherwise, change the address to point to a running redis instance.

Then we construct a server out of the configuration and register the task with the RegisterTask method. We pass a name and the corresponding function to execute for that task. It becomes simpler if we use the function as the task name. Once the task is registered, we need to create one or more worker processes. Here we create a new worker instance by calling NewWorker method on the server. We pass a worker name so we can identify it later on (for example in the logs). We then Launch the worker. The worker starts up and connects to our redis server. It then subscribes to appropriate channels to start listenning to task requests.

That’s all we need to do to create a task and worker.

Requesting / Sending Tasks

Now from another process (say from a running web application), on a certain ocassion, we want to run a background task. Here we will see how we can send task requests.

In our root directory, let’s create another main.go file and main function to send the tasks.

package main

import (
	machinery "github.com/RichardKnop/machinery/v1"
	"github.com/RichardKnop/machinery/v1/config"
	"github.com/RichardKnop/machinery/v1/errors"
	"github.com/RichardKnop/machinery/v1/signatures"
)

func main() {

	var cnf = config.Config{
		Broker:        "redis://127.0.0.1:6379",
		ResultBackend: "redis://127.0.0.1:6379",
	}

	server, err := machinery.NewServer(&cnf)
	if err != nil {
		errors.Fail(err, "Can not create server!")
	}

	sayTask := signatures.TaskSignature{
		Name: "Say",
		Args: []signatures.TaskArg{
			signatures.TaskArg{
				Type:  "string",
				Value: "masnun",
			},
		},
	}

	server.SendTask(&sayTask)

}

If you look carefully, up to the server creation, the code is same. We define a config and create a server. Then we define a task signature. We need to define task signatures to request task executions. In the task signature, we need to pass the Name of the task and a list of arguments as Args. The args will be of TaskArg type. Each TaskArg need to set the Type and the Value. These arguments will be passed along to our function when the worker receives this request.

To queue a task, we use the SendTask method and pass a pointer to our TaskSignature.

Tying it out!

Make sure the redis server is running. In case it is not, run it.

redis-server

Once redis is running, build and run the worker.

cd worker
go build
./worker

Once the worker starts up, you should see some messages like these:

machinery: worker.go:27: Launching a worker with the following settings:
machinery: worker.go:28: - Broker: redis://127.0.0.1:6379
machinery: worker.go:29: - ResultBackend: redis://127.0.0.1:6379
machinery: worker.go:30: - Exchange:
machinery: worker.go:31: - ExchangeType:
machinery: worker.go:32: - DefaultQueue:
machinery: worker.go:33: - BindingKey:
machinery: redis.go:86: [*] Waiting for messages. To exit press CTRL+C

Now we need to build the program that will send tasks to the queue. Open a new terminal window and navigate to the project root. Build the main program and run it.

go build -o main
./main

That should queue the task. Now switch to the worker process and check the output. If everything goes right, we will see some output like:

machinery: redis.go:211: Received new message: {"UUID":"task_c39f7e99-df4d-443a-ad21-3481260b34fb","Name":"Say","RoutingKey":"","GroupUUID":"","GroupTaskCount":0,"Args":[{"Type":"string","Value":"masnun"}],"Headers":null,"Immutable":false,"OnSuccess":null,"OnError":null,"ChordCallback":null}
machinery: worker.go:110: Processed task_c39f7e99-df4d-443a-ad21-3481260b34fb. Result = Hello masnun!

Since we are using a ResultBackend too, we can check the state and retrieve the task results.


asyncResult, err := server.SendTask(&sayTask)

taskState := asyncResult.GetState()
fmt.Printf("Current state of %v task is:\n", taskState.TaskUUID)
fmt.Println(taskState.State)

result, err := asyncResult.Get()
fmt.Println(result.Interface())

(My example code on Github does not include this part, it would be a good self practice to try these out ourselves, no?)

The machinery library has some other cool features too. Do checkout the github repo for in depth documentation and code samples.