Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 5

Koray Göçmen
5 min readOct 20, 2020
Single scheduler (conductor) and multiple workers

Part 5— The worker

The entry point of the worker is the worker/worker.go file which contains the init and the main functions. Main functions call 2 functions async, which are to start the GRPC server and to register on the scheduler.

// worker/worker.gopackage main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
)

var (
// workerID is the id assigned by the scheduler
// after registering on scheduler.
workerID string
)

func init() {
loadConfig()
}

// Entry point of the worker application.
func main() {

go startGRPCServer()
go registerWorker()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
fatal(fmt.Sprintf("Signal (%d) received, stopping\n", s))
}
}
}

func fatal(message string) {
deregisterWorker()
log.Fatalln(message)
}

The GRPC server of the worker calls 3 main important functions. These are the “startScript”, “stopScript”, “queryScript” functions. These functions call the script files or the “jobs” with the provided command and record the output of those scripts to a file on the worker.

// worker/grpc_server.gopackage main

import (
"context"
"fmt"
"log"
"net"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// server holds the GRPC worker server instance.
type server struct{}

// StartJob starts a new job with the given command and the path
// Command can be any exectuable command on the worker and the path
// is the relative path of the script.
func (s *server) StartJob(ctx context.Context, r *pb.StartJobReq) (*pb.StartJobRes, error) {
jobID, err := startScript(r.Command, r.Path)
if err != nil {
return nil, err
}

res := pb.StartJobRes{
JobID: jobID,
}

return &res, nil
}

// StopJob stops a running job with the given job id.
func (s *server) StopJob(ctx context.Context, r *pb.StopJobReq) (*pb.StopJobRes, error) {
if err := stopScript(r.JobID); err != nil {
return nil, err
}

return &pb.StopJobRes{}, nil
}

// QueryJob returns the status of job with the given job id.
// The status of the job is inside the `Done` variable in response
// and it specifies if the job is still running (true), or stopped (false).
func (s *server) QueryJob(ctx context.Context, r *pb.QueryJobReq) (*pb.QueryJobRes, error) {
jobDone, jobError, jobErrorText, err := queryScript(r.JobID)
if err != nil {
return nil, err
}

res := pb.QueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}
return &res, nil
}

// startGRPCServer starts the GRPC server for the worker.
// Scheduler can make grpc requests to this server to start,
// stop, query status of jobs etc.
func startGRPCServer() {
lis, err := net.Listen("tcp", config.GRPCServer.Addr)
if err != nil {
fatal(fmt.Sprintf("failed to listen: %v", err))
}

var opts []grpc.ServerOption
if config.GRPCServer.UseTLS {
creds, err := credentials.NewServerTLSFromFile(
config.GRPCServer.CrtFile,
config.GRPCServer.KeyFile,
)
if err != nil {
fatal(fmt.Sprint("Failed to generate credentials", err))
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}

log.Println("GRPC Server listening on", config.GRPCServer.Addr)

grpcServer := grpc.NewServer(opts...)
pb.RegisterWorkerServer(grpcServer, &server{})
grpcServer.Serve(lis)
}

jobs.go contains those 3 important functions to start, query and stop scripts. Each job is kept in the jobs map with job id as the key.

// worker/job.gopackage main

import (
"errors"
"fmt"
"os"
"os/exec"
"sync"

"github.com/google/uuid"
)

// jobsMutex is the lock to access jobs map.
// jobs is the map that holds current/past jobs.
// - key: job id
// - value: pointer to the created job object.
var (
jobsMutex = &sync.Mutex{}
jobs = make(map[string]*job)
)

// job holds information about the ongoing or past jobs,
// that were triggered by the scheduler.
// - id: UUID assigned by the worker and sent back to the scheduler.
// - command: command which the scheduler run the job with
// - path: path to the job file/executable sent by the scheduler.
// - outFilePath: file path to where the output of the job will be piped.
// - cmd: pointer to the cmd.Exec command to get job status etc.
// - done: whether if job is done (default false)
// - err: error while running the job (default nil)
type job struct {
id string
command string
path string
outFilePath string
cmd *exec.Cmd
done bool
err error
}

// startScript start a new job.
// Returns:
// - string: job id
// - error: nil if no error
func startScript(command, path string) (string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

jobID := uuid.New().String()
outFilePath := fmt.Sprintf("%s.out", jobID)

outfile, err := os.Create(outFilePath)
if err != nil {
return "", err
}
defer outfile.Close()

cmd := exec.Command(command, path)
cmd.Stdout = outfile

if err = cmd.Start(); err != nil {
return "", err
}

newJob := job{
id: jobID,
command: command,
path: path,
outFilePath: outFilePath,
cmd: cmd,
done: false,
err: nil,
}
jobs[jobID] = &newJob

// Get the status of the job async.
go func() {
if err := cmd.Wait(); err != nil {
newJob.err = err
}
newJob.done = true
}()

return jobID, nil
}

// stopScript stop a running job.
// Returns:
// - error: nil if no error
func stopScript(jobID string) error {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return errors.New("job not found")
}

if job.done {
return nil
}

if err := job.cmd.Process.Kill(); err != nil {
return err
}

return nil
}

// queryScript check if job is done or not.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryScript(jobID string) (bool, bool, string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return false, false, "", errors.New("job not found")
}

var (
jobDone = job.done
jobError = false
jobErrorText = ""
)

if job.err != nil {
jobError = true
jobErrorText = job.err.Error()
}

return jobDone, jobError, jobErrorText, nil
}

The MOST important piece of this whole project is how the script is started on the worker. The script is started inside a nameless concurrent function inside the “startScript” function in order to still have access to the job object it was started for. This way, when the job is finished or when an error occurs, I don’t need to go find the job with the key and try to find a way to pass it etc. I can just access the calling job object since the script is running inside an anonymous coroutine that is still in the same context.

There is quite a bit of stuff I left out, such as the proto file, how to build to a go file that is shared between the worker and the scheduler. The config file for both and the entire Makefile. They can be found on my Github repo. But the stuff I touched upon was the most important pieces.

Yeah, that’s pretty much it!

--

--

Koray Göçmen

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.