Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 4

Koray Göçmen
5 min readOct 20, 2020

--

Single scheduler (conductor) and multiple workers

Part 4 — The scheduler

The entry point of the scheduler is the scheduler/scheduler.go file which contains the init and the main functions. Main functions call 2 functions async, which are to start the HTTP server (API) and the GRPC server.

// scheduler/scheduler.gopackage main

import (
"log"
"os"
"os/signal"
"syscall"
)

func init() {
loadConfig()
}

// Entry point of the scheduler application.
func main() {

go api()
go startGRPCServer()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
log.Fatalf("Signal (%d) received, stopping\n", s)
}
}
}

API uses Juline Schmidt’s HTTP router: github.com/julienschmidt/httprouter

API exposes the 3 API requests to start/query/stop jobs on a worker. These functions are pretty standard and not important. The important parts are the 3 functions that do all the work: “startJobOnWorker”, “stopJobOnWorker”, “queryJobOnWorker”.

// scheduler/api.gopackage main

import (
"encoding/json"
"io/ioutil"
"log"
"net/http"

"github.com/julienschmidt/httprouter"
)

const (
contentTypeHeader = "Content-Type"
applicationJSONHeader = "application/json"
)

func apiStartJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
startJobReq := apiStartJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobID, err := startJobOnWorker(startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(apiStartJobRes{JobID: jobID})
}

func apiStopJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
stopJobReq := apiStopJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &stopJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

if err := stopJobOnWorker(stopJobReq); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(apiStopJobRes{Success: true})
}

func apiQueryJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
queryJobReq := apiQueryJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobDone, jobError, jobErrorText, err := queryJobOnWorker(queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

queryJobRes := apiQueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(queryJobRes)
}

func createRouter() *httprouter.Router {
router := httprouter.New()

router.POST("/start", apiStartJob)
router.POST("/stop", apiStopJob)
router.POST("/query", apiQueryJob)

return router
}

func api() {
srv := &http.Server{
Addr: config.HTTPServer.Addr,
Handler: createRouter(),
}

log.Println("HTTP Server listening on", config.HTTPServer.Addr)
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}

Those 3 functions are located at grpc_translator.go file and they translate HTTP requests to GRPC requests on specified workers.

// scheduler/grpc_translator.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

GRPC translator calls worker functions to create and delete workers. These 2 functions just assign an id to a new worker and add the new worker to workers map and vice versa. Pretty standard. I am too bored to add them here.

Finally, the last important bit is the GRPC server of the scheduler. Which is used by the workers to register and deregister on the scheduler.

// scheduler/grpc_server.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

Alright, the worker is going to be slightly more complicated. But hopefully will be the last piece.

--

--

Koray Göçmen

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.