Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 5

Single scheduler (conductor) and multiple workers

Part 5— The worker

// worker/worker.gopackage main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
)

var (
// workerID is the id assigned by the scheduler
// after registering on scheduler.
workerID string
)

func init() {
loadConfig()
}

// Entry point of the worker application.
func main() {

go startGRPCServer()
go registerWorker()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
fatal(fmt.Sprintf("Signal (%d) received, stopping\n", s))
}
}
}

func fatal(message string) {
deregisterWorker()
log.Fatalln(message)
}
// worker/grpc_server.gopackage main

import (
"context"
"fmt"
"log"
"net"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// server holds the GRPC worker server instance.
type server struct{}

// StartJob starts a new job with the given command and the path
// Command can be any exectuable command on the worker and the path
// is the relative path of the script.
func (s *server) StartJob(ctx context.Context, r *pb.StartJobReq) (*pb.StartJobRes, error) {
jobID, err := startScript(r.Command, r.Path)
if err != nil {
return nil, err
}

res := pb.StartJobRes{
JobID: jobID,
}

return &res, nil
}

// StopJob stops a running job with the given job id.
func (s *server) StopJob(ctx context.Context, r *pb.StopJobReq) (*pb.StopJobRes, error) {
if err := stopScript(r.JobID); err != nil {
return nil, err
}

return &pb.StopJobRes{}, nil
}

// QueryJob returns the status of job with the given job id.
// The status of the job is inside the `Done` variable in response
// and it specifies if the job is still running (true), or stopped (false).
func (s *server) QueryJob(ctx context.Context, r *pb.QueryJobReq) (*pb.QueryJobRes, error) {
jobDone, jobError, jobErrorText, err := queryScript(r.JobID)
if err != nil {
return nil, err
}

res := pb.QueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}
return &res, nil
}

// startGRPCServer starts the GRPC server for the worker.
// Scheduler can make grpc requests to this server to start,
// stop, query status of jobs etc.
func startGRPCServer() {
lis, err := net.Listen("tcp", config.GRPCServer.Addr)
if err != nil {
fatal(fmt.Sprintf("failed to listen: %v", err))
}

var opts []grpc.ServerOption
if config.GRPCServer.UseTLS {
creds, err := credentials.NewServerTLSFromFile(
config.GRPCServer.CrtFile,
config.GRPCServer.KeyFile,
)
if err != nil {
fatal(fmt.Sprint("Failed to generate credentials", err))
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}

log.Println("GRPC Server listening on", config.GRPCServer.Addr)

grpcServer := grpc.NewServer(opts...)
pb.RegisterWorkerServer(grpcServer, &server{})
grpcServer.Serve(lis)
}
// worker/job.gopackage main

import (
"errors"
"fmt"
"os"
"os/exec"
"sync"

"github.com/google/uuid"
)

// jobsMutex is the lock to access jobs map.
// jobs is the map that holds current/past jobs.
// - key: job id
// - value: pointer to the created job object.
var (
jobsMutex = &sync.Mutex{}
jobs = make(map[string]*job)
)

// job holds information about the ongoing or past jobs,
// that were triggered by the scheduler.
// - id: UUID assigned by the worker and sent back to the scheduler.
// - command: command which the scheduler run the job with
// - path: path to the job file/executable sent by the scheduler.
// - outFilePath: file path to where the output of the job will be piped.
// - cmd: pointer to the cmd.Exec command to get job status etc.
// - done: whether if job is done (default false)
// - err: error while running the job (default nil)
type job struct {
id string
command string
path string
outFilePath string
cmd *exec.Cmd
done bool
err error
}

// startScript start a new job.
// Returns:
// - string: job id
// - error: nil if no error
func startScript(command, path string) (string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

jobID := uuid.New().String()
outFilePath := fmt.Sprintf("%s.out", jobID)

outfile, err := os.Create(outFilePath)
if err != nil {
return "", err
}
defer outfile.Close()

cmd := exec.Command(command, path)
cmd.Stdout = outfile

if err = cmd.Start(); err != nil {
return "", err
}

newJob := job{
id: jobID,
command: command,
path: path,
outFilePath: outFilePath,
cmd: cmd,
done: false,
err: nil,
}
jobs[jobID] = &newJob

// Get the status of the job async.
go func() {
if err := cmd.Wait(); err != nil {
newJob.err = err
}
newJob.done = true
}()

return jobID, nil
}

// stopScript stop a running job.
// Returns:
// - error: nil if no error
func stopScript(jobID string) error {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return errors.New("job not found")
}

if job.done {
return nil
}

if err := job.cmd.Process.Kill(); err != nil {
return err
}

return nil
}

// queryScript check if job is done or not.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryScript(jobID string) (bool, bool, string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return false, false, "", errors.New("job not found")
}

var (
jobDone = job.done
jobError = false
jobErrorText = ""
)

if job.err != nil {
jobError = true
jobErrorText = job.err.Error()
}

return jobDone, jobError, jobErrorText, nil
}

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store