The Startup
Published in

The Startup

Single Scheduler — Multiple Worker Architecture with GRPC and Go — Part 4

Single scheduler (conductor) and multiple workers

Part 4 — The scheduler

The entry point of the scheduler is the scheduler/scheduler.go file which contains the init and the main functions. Main functions call 2 functions async, which are to start the HTTP server (API) and the GRPC server.

// scheduler/scheduler.gopackage main

import (
"log"
"os"
"os/signal"
"syscall"
)

func init() {
loadConfig()
}

// Entry point of the scheduler application.
func main() {

go api()
go startGRPCServer()

sig := make(chan os.Signal)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case s := <-sig:
log.Fatalf("Signal (%d) received, stopping\n", s)
}
}
}

API uses Juline Schmidt’s HTTP router: github.com/julienschmidt/httprouter

API exposes the 3 API requests to start/query/stop jobs on a worker. These functions are pretty standard and not important. The important parts are the 3 functions that do all the work: “startJobOnWorker”, “stopJobOnWorker”, “queryJobOnWorker”.

// scheduler/api.gopackage main

import (
"encoding/json"
"io/ioutil"
"log"
"net/http"

"github.com/julienschmidt/httprouter"
)

const (
contentTypeHeader = "Content-Type"
applicationJSONHeader = "application/json"
)

func apiStartJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
startJobReq := apiStartJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobID, err := startJobOnWorker(startJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(apiStartJobRes{JobID: jobID})
}

func apiStopJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
stopJobReq := apiStopJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &stopJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

if err := stopJobOnWorker(stopJobReq); err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(apiStopJobRes{Success: true})
}

func apiQueryJob(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
queryJobReq := apiQueryJobReq{}

w.Header().Set(contentTypeHeader, applicationJSONHeader)

body, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

err = json.Unmarshal(body, &queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

jobDone, jobError, jobErrorText, err := queryJobOnWorker(queryJobReq)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(apiError{Error: err.Error()})
return
}

queryJobRes := apiQueryJobRes{
Done: jobDone,
Error: jobError,
ErrorText: jobErrorText,
}

w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(queryJobRes)
}

func createRouter() *httprouter.Router {
router := httprouter.New()

router.POST("/start", apiStartJob)
router.POST("/stop", apiStopJob)
router.POST("/query", apiQueryJob)

return router
}

func api() {
srv := &http.Server{
Addr: config.HTTPServer.Addr,
Handler: createRouter(),
}

log.Println("HTTP Server listening on", config.HTTPServer.Addr)
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
}

Those 3 functions are located at grpc_translator.go file and they translate HTTP requests to GRPC requests on specified workers.

// scheduler/grpc_translator.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

GRPC translator calls worker functions to create and delete workers. These 2 functions just assign an id to a new worker and add the new worker to workers map and vice versa. Pretty standard. I am too bored to add them here.

Finally, the last important bit is the GRPC server of the scheduler. Which is used by the workers to register and deregister on the scheduler.

// scheduler/grpc_server.gopackage main

import (
"context"
"errors"
"time"

pb "github.com/koraygocmen/scheduler-worker-grpc/jobscheduler"
"google.golang.org/grpc"
)

// startJobOnWorker translates the http start request to grpc
// request on the workers.
// Returns:
// - string: job id
// - error: nil if no error
func startJobOnWorker(req apiStartJobReq) (string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

startJobReq := pb.StartJobReq{
Command: req.Command,
Path: req.Path,
}

r, err := c.StartJob(ctx, &startJobReq)
if err != nil {
return "", err
}

return r.JobID, nil
}

// stopJobOnWorker translates the http stop request to grpc
// request on the workers.
// Returns:
// - error: nil if no error
func stopJobOnWorker(req apiStopJobReq) error {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

stopJobReq := pb.StopJobReq{
JobID: req.JobID,
}

if _, err := c.StopJob(ctx, &stopJobReq); err != nil {
return err
}

return nil
}

// queryJobOnWorker translates the http query request to grpc
// request on the workers.
// Returns:
// - bool: job status (true if job is done)
// - bool: job error (true if job had an error)
// - string: job error text ("" if job error is false)
// - error: nil if no error
func queryJobOnWorker(req apiQueryJobReq) (bool, bool, string, error) {
workersMutex.Lock()
defer workersMutex.Unlock()

worker, ok := workers[req.WorkerID]
if !ok {
return false, false, "", errors.New("worker not found")
}

conn, err := grpc.Dial(worker.addr, grpc.WithInsecure())
if err != nil {
return false, false, "", err
}
defer conn.Close()
c := pb.NewWorkerClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

queryJobReq := pb.QueryJobReq{
JobID: req.JobID,
}

r, err := c.QueryJob(ctx, &queryJobReq)
if err != nil {
return false, false, "", err
}

return r.Done, r.Error, r.ErrorText, nil
}

Alright, the worker is going to be slightly more complicated. But hopefully will be the last piece.

--

--

--

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +756K followers.

Recommended from Medium

Introducing pyspark_xray: a diagnostic tool that enables local debugging of PySpark applications…

EBOOK [P.D.F]

New post on The Gate’s Fanpage.

6 steps of software development which must be avoid by the developers

Developer Salaries Germany 2021 Report: tl;dr

What You Should Clarify Before Accepting a Job Offer

Handshake

Selecting gunicorn worker types for different python web application

Swift Basics Tutorial

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Koray Göçmen

Koray Göçmen

University of Toronto, Computer Engineering, architected and implemented reliable infrastructures and worked as the lead developer for multiple startups.

More from Medium

Distributed mutex based on PostgreSQL in Go. Complete example.

Simple data alignment technique to Speed Up Your Struct in Golang

Go Concurrency testing in CPU constrained environments

Redis to REST, securely