The Startup
Published in

The Startup

Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 5

Single scheduler (conductor) and multiple workers

Part 5— The worker

// worker/worker.gopackage main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
)

var (
// workerID is the id assigned by the scheduler
// after registering on scheduler.
workerID string
)

func init() {
loadConfig()
}

// Entry point of the worker application.
func main() {

go startGRPCServer()
go registerWorker()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
fatal(fmt.Sprintf("Signal (%d) received, stopping\n", s))
}
}
}

func fatal(message string) {
deregisterWorker()
log.Fatalln(message)
}
// worker/grpc_server.gopackage main

import (
"context"
"fmt"
"log"
"net"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// server holds the GRPC worker server instance.
type server struct{}

// StartJob starts a new job with the given command and the path
// Command can be any exectuable command on the worker and the path
// is the relative path of the script.
func (s *server) StartJob(ctx context.Context, r *pb.StartJobReq) (*pb.StartJobRes, error) {
jobID, err := startScript(r.Command, r.Path)
if err != nil {
return nil, err
}

res := pb.StartJobRes{
JobID: jobID,
}

return &res, nil
}

// StopJob stops a running job with the given job id.
func (s *server) StopJob(ctx context.Context, r *pb.StopJobReq) (*pb.StopJobRes, error) {
if err := stopScript(r.JobID); err != nil {
return nil, err
}

return &pb.StopJobRes{}, nil
}

// QueryJob returns the status of job with the given job id.
// The status of the job is inside the `Done` variable in response
// and it specifies if the job is still running (true), or stopped (false).
func (s *server) QueryJob(ctx context.Context, r *pb.QueryJobReq) (*pb.QueryJobRes, error) {
jobDone, jobError, jobErrorText, err := queryScript(r.JobID)
if err != nil {
return nil, err
}

res := pb.QueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}
return &res, nil
}

// startGRPCServer starts the GRPC server for the worker.
// Scheduler can make grpc requests to this server to start,
// stop, query status of jobs etc.
func startGRPCServer() {
lis, err := net.Listen("tcp", config.GRPCServer.Addr)
if err != nil {
fatal(fmt.Sprintf("failed to listen: %v", err))
}

var opts []grpc.ServerOption
if config.GRPCServer.UseTLS {
creds, err := credentials.NewServerTLSFromFile(
config.GRPCServer.CrtFile,
config.GRPCServer.KeyFile,
)
if err != nil {
fatal(fmt.Sprint("Failed to generate credentials", err))
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}

log.Println("GRPC Server listening on", config.GRPCServer.Addr)

grpcServer := grpc.NewServer(opts...)
pb.RegisterWorkerServer(grpcServer, &server{})
grpcServer.Serve(lis)
}
// worker/job.gopackage main

import (
"errors"
"fmt"
"os"
"os/exec"
"sync"

"github.com/google/uuid"
)

// jobsMutex is the lock to access jobs map.
// jobs is the map that holds current/past jobs.
// - key: job id
// - value: pointer to the created job object.
var (
jobsMutex = &sync.Mutex{}
jobs = make(map[string]*job)
)

// job holds information about the ongoing or past jobs,
// that were triggered by the scheduler.
// - id: UUID assigned by the worker and sent back to the scheduler.
// - command: command which the scheduler run the job with
// - path: path to the job file/executable sent by the scheduler.
// - outFilePath: file path to where the output of the job will be piped.
// - cmd: pointer to the cmd.Exec command to get job status etc.
// - done: whether if job is done (default false)
// - err: error while running the job (default nil)
type job struct {
id string
command string
path string
outFilePath string
cmd *exec.Cmd
done bool
err error
}

// startScript start a new job.
// Returns:
// - string: job id
// - error: nil if no error
func startScript(command, path string) (string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

jobID := uuid.New().String()
outFilePath := fmt.Sprintf("%s.out", jobID)

outfile, err := os.Create(outFilePath)
if err != nil {
return "", err
}
defer outfile.Close()

cmd := exec.Command(command, path)
cmd.Stdout = outfile

if err = cmd.Start(); err != nil {
return "", err
}

newJob := job{
id: jobID,
command: command,
path: path,
outFilePath: outFilePath,
cmd: cmd,
done: false,
err: nil,
}
jobs[jobID] = &newJob

// Get the status of the job async.
go func() {
if err := cmd.Wait(); err != nil {
newJob.err = err
}
newJob.done = true
}()

return jobID, nil
}

// stopScript stop a running job.
// Returns:
// - error: nil if no error
func stopScript(jobID string) error {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return errors.New("job not found")
}

if job.done {
return nil
}

if err := job.cmd.Process.Kill(); err != nil {
return err
}

return nil
}

// queryScript check if job is done or not.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryScript(jobID string) (bool, bool, string, error) {
jobsMutex.Lock()
defer jobsMutex.Unlock()

job, found := jobs[jobID]
if !found {
return false, false, "", errors.New("job not found")
}

var (
jobDone = job.done
jobError = false
jobErrorText = ""
)

if job.err != nil {
jobError = true
jobErrorText = job.err.Error()
}

return jobDone, jobError, jobErrorText, nil
}

--

--

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +768K followers.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Koray Göçmen

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.