The Startup
Published in

The Startup

Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 4

Single scheduler (conductor) and multiple workers

Part 4 — The scheduler

// scheduler/scheduler.gopackage main

import (
"log"
"os"
"os/signal"
"syscall"
)

func init() {
loadConfig()
}

// Entry point of the scheduler application.
func main() {

go api()
go startGRPCServer()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
log.Fatalf("Signal (%d) received, stopping\n", s)
}
}
}
// scheduler/api.gopackage main

import (
"encoding/json"
"io/ioutil"
"log"
"net/http"

"github.com/julienschmidt/httprouter"
)

const (
contentTypeHeader = "Content-Type"
applicationJSONHeader = "application/json"
)

func apiStartJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
startJobReq := apiStartJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobID, err := startJobOnWorker(startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(apiStartJobRes{JobID: jobID})
}

func apiStopJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
stopJobReq := apiStopJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &stopJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

if err := stopJobOnWorker(stopJobReq); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(apiStopJobRes{Success: true})
}

func apiQueryJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
queryJobReq := apiQueryJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobDone, jobError, jobErrorText, err := queryJobOnWorker(queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

queryJobRes := apiQueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(queryJobRes)
}

func createRouter() *httprouter.Router {
router := httprouter.New()

router.POST("/start", apiStartJob)
router.POST("/stop", apiStopJob)
router.POST("/query", apiQueryJob)

return router
}

func api() {
srv := &http.Server{
Addr: config.HTTPServer.Addr,
Handler: createRouter(),
}

log.Println("HTTP Server listening on", config.HTTPServer.Addr)
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}
// scheduler/grpc_translator.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}
// scheduler/grpc_server.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

--

--

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +760K followers.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Koray Göçmen

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.