Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 4

Image for post
Image for post
Single scheduler (conductor) and multiple workers

Part 4 — The scheduler

// scheduler/scheduler.gopackage main

import (
"log"
"os"
"os/signal"
"syscall"
)

func init() {
loadConfig()
}

// Entry point of the scheduler application.
func main() {

go api()
go startGRPCServer()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
log.Fatalf("Signal (%d) received, stopping\n", s)
}
}
}
// scheduler/api.gopackage main

import (
"encoding/json"
"io/ioutil"
"log"
"net/http"

"github.com/julienschmidt/httprouter"
)

const (
contentTypeHeader = "Content-Type"
applicationJSONHeader = "application/json"
)

func apiStartJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
startJobReq := apiStartJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobID, err := startJobOnWorker(startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(apiStartJobRes{JobID: jobID})
}

func apiStopJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
stopJobReq := apiStopJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &stopJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

if err := stopJobOnWorker(stopJobReq); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(apiStopJobRes{Success: true})
}

func apiQueryJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
queryJobReq := apiQueryJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobDone, jobError, jobErrorText, err := queryJobOnWorker(queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

queryJobRes := apiQueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(queryJobRes)
}

func createRouter() *httprouter.Router {
router := httprouter.New()

router.POST("/start", apiStartJob)
router.POST("/stop", apiStopJob)
router.POST("/query", apiQueryJob)

return router
}

func api() {
srv := &http.Server{
Addr: config.HTTPServer.Addr,
Handler: createRouter(),
}

log.Println("HTTP Server listening on", config.HTTPServer.Addr)
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}
// scheduler/grpc_translator.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}
// scheduler/grpc_server.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store