Scaling Go Applications

From bibbleWiki
Jump to navigation Jump to search

Introduction

Resources

When we look at scaling we must consider the following resources.

  • Network Bandwidth
  • Processing Power
  • Available Memory
  • Data Storage

For this page we will be looking a the challenges around breaking an application in to several servers and following an architecture such as below Go Scale overview.png This not meant to be THE solution but a walkthrough on some of the challenges to scale. This will be split into

  • Initial Optimizations
  • Creating a Load Balancer
  • Caching
  • Centralized Logging

AGAIN this is an approach and of course you can use third-party cloud solutions. This one will be confined to Docker and GO.

Initial Optimizations

Content Compression

One of the easiest things to do is to change the http messages to use compression in our messages.

Gzip Handler

For this we write a GzipHandler.

package util

import (
	"compress/gzip"
	"net/http"
	"strings"
)

type CloseableResponseWriter interface {
	http.ResponseWriter
	Close()
}

type gzipResponseWriter struct {
	http.ResponseWriter
	*gzip.Writer
}

func (w gzipResponseWriter) Write(data []byte) (int, error) {
	return w.Writer.Write(data)
}

func (w gzipResponseWriter) Close() {
	w.Writer.Close()
}

func (w gzipResponseWriter) Header() http.Header {
	return w.ResponseWriter.Header()
}

type closeableResponseWriter struct {
	http.ResponseWriter
}

func (w closeableResponseWriter) Close() {}

func GetResponseWriter(w http.ResponseWriter, req *http.Request) CloseableResponseWriter {
	if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
		w.Header().Set("Content-Encoding", "gzip")
		gRW := gzipResponseWriter{
			ResponseWriter: w,
			Writer:         gzip.NewWriter(w),
		}
		return gRW
	} else {
		return closeableResponseWriter{ResponseWriter: w}
	}
}

type GzipHandler struct{}

func (h *GzipHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	responseWriter := GetResponseWriter(w, r)
	defer responseWriter.Close()

	http.DefaultServeMux.ServeHTTP(responseWriter, r)
}

Implement on Server

To we can now use this within the server.

func main() {
...
    go http.ListenAndServe(":3000", new(util.GzipHandler))

Move to HTTP/2

This provides

  • Header compression
  • Reusable Connections
  • Server Push

To move to http/2 we need to run over https. Within go you can run this but I do mine manually

go run /usr/lib/go/src/crypto/tls/generate_cert.go -host localhost

Then to use you just need to change the ListenAndServ to ListenAndServTLS and add the cert and key. I did not have problems using the self signed certs but here is what some people have done.

package data

import (
	"crypto/tls"
	"flag"
	"net/http"
)

var dataServiceUrl = flag.String("dataservice", "https://localhost:4000", "Address of the data service provider")

func init() {
	tr := http.Transport{
		TLSClientConfig: &tls.Config{
			InsecureSkipVerify: true,
		},
	}

	http.DefaultClient = &http.Client{Transport: &tr}
}

Creating a Load Balancer

Introduction

To implement a load balancer we need

  • Load Balancer
  • Provider Registration

Load Balancer

This is a very simple load balancer to start. Each webrequest coming in contains it's request, response and a channel

type webRequest struct {
	r      *http.Request
	w      http.ResponseWriter
	doneCh chan struct{}
}


Two go routines are launched, one to process requests and one to listen for the requests

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		doneCh := make(chan struct{})
		requestCh <- &webRequest{r: r, w: w, doneCh: doneCh}
		<-doneCh
	})

	go processRequests()

	go http.ListenAndServeTLS(":2000", "cert.pem", "key.pem", nil)

	log.Println("Server started, press <ENTER> to exit")
	fmt.Scanln()
}


The process keeps a list of app servers, the current index and a client

var (
	appservers   = []string{}
	currentIndex = 0
	client       = http.Client{Transport: &transport}
)


The process requests listens for requests on the channel, increments the index to ensure the next server is used.

func processRequests() {
	for {
		select {
		case request := <-requestCh:
			println("request")
			if len(appservers) == 0 {
				request.w.WriteHeader(http.StatusInternalServerError)
				request.w.Write([]byte("No app servers found"))
				request.doneCh <- struct{}{}
				continue
			}
			currentIndex++
			if currentIndex == len(appservers) {
				currentIndex = 0
			}
			host := appservers[currentIndex]
			go processRequest(host, request)
		}
	}
}

Finally the requests are processed.

  • Incoming Request is copied to a new app server request
  • Request is processed
  • Response headers a copied back to original request
  • Incoming Request is marked as done
func processRequest(host string, request *webRequest) {
	hostURL, _ := url.Parse(request.r.URL.String())
	hostURL.Scheme = "https"
	hostURL.Host = host
	println(host)
	println(hostURL.String())
	req, _ := http.NewRequest(request.r.Method, hostURL.String(), request.r.Body)
	for k, v := range request.r.Header {
		values := ""
		for _, headerValue := range v {
			values += headerValue + " "
		}
		req.Header.Add(k, values)
	}

	resp, err := client.Do(req)

	if err != nil {
		request.w.WriteHeader(http.StatusInternalServerError)
		request.doneCh <- struct{}{}
		return
	}

	for k, v := range resp.Header {
		values := ""
		for _, headerValue := range v {
			values += headerValue + " "
		}
		request.w.Header().Add(k, values)
	}
	io.Copy(request.w, resp.Body)

	request.doneCh <- struct{}{}
}

Provider Registration

Provider registration consists of

  • Service Discovery
  • Service Termination
  • Heartbeat Checking

Service Discovery

This is how the service makes the load balancer aware of them. When we implement this we need to think about if they process all services or just some of the services.

Service Termination

This is where the service lets the load balancer know it is terminating. We need to consider the in flight transactions.

Heartbeat Checking

This is a poll to the load balancing to ensure the service is available

Load Balancer Implementation

For the load balancer the processing of registration, deregistration was down on a separate port so that this port was not not expose to the public.

Processing requests

In the main function a new listener is created and a new handler which handles the register and unregister.

	go http.ListenAndServeTLS(":2001", "cert.pem", "key.pem", new(appserverHandler))
...
type appserverHandler struct{}

func (h *appserverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	ip := strings.Split(r.RemoteAddr, ":")[0]
	port := r.URL.Query().Get("port")
	switch r.URL.Path {
	case "/register":
		registerCh <- ip + ":" + port
	case "/unregister":
		unregisterCh <- ip + ":" + port
	}
}

Within the existing processRequests function, the register, unregister and heartbeat functions are implemented.

Register

This checks if it already exists. If it does not then it is appended to the available servers.

		case host := <-registerCh:
			println("register " + host)
			isFound := false
			for _, h := range appservers {
				if host == h {
					isFound = true
					break
				}
			}

			if !isFound {
				appservers = append(appservers, host)
			}

Unregister

The unregister simply removes it from the list of strings

    case host := <-unregisterCh:
	println("unregister " + host)
	for i := len(appservers) - 1; i >= 0; i-- {
	    if appservers[i] == host {
		appservers = append(appservers[:i], appservers[i+1:]...)
	    }
	}

Heartbeat

The heartbeast is a go function to ensure the processing is not blocking the main thread. It simply issues a get to /ping and where no response is provided it sends a deregister.

	case <-heartbeartCh:
	    println("heartbeat")
	    servers := appservers[:]
	    go func(servers []string) {
		for _, server := range servers {
			resp, err := http.Get("https://" + server + "/ping")
			if err != nil || resp.StatusCode != 200 {
				unregisterCh <- server
			}
		}
	    }(servers)

App Server Implementation

Introduction

This is a poor way to do this and we should look at a way to improve this.

Register

Lets add a flag, which is a variable which can be override at runtime, with the load balancer and then after starting the server wait a second to ensure started aaarrrrgghhh and then send the register to the load balancer

var loadbalancerURL = flag.String("loadbalancer", "https://172.18.0.12:2001", "Address of the load balancer")
...
	time.Sleep(1 * time.Second)
	http.Get(*loadbalancerURL + "/register?port=3000")

Unregister

To unregister just call the unregister route after server ended

	http.Get(*loadbalancerURL + "/unregister?port=3000")

Heartbeat

The heartbeat simply responds with a 200.

	http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
	})

Problems

We probably want to look at how to stop pinging of servers which are in the process of being deregistered.

Caching

Introduction

Quite like the first thing to do is look at what is going on and understand the problem. If you don't you have nothing to compare with.
The original approach took 590s
Scaling Transactions.png
Introducing the load balancer increased the transaction to 737ms but also increased the possibilities for capacity as each App Server will only impact the load balancer.
Scaling Transaction2.png
If we now introduce a cache we can reduce the 737ms down to 447 which will mean we are faster than the original. This is a great way to sell to the business but of course the question might be can you put a cache in the original solution sigh! Scaling Transactions3.png

Approach

So the approach is to build a cache service which supports

  • Storing data
  • Expiration of data
  • Invalidation of data

Caching Service Implementation

Main Function

As ever get the high level right. Lets start a service which supports two routes, slash "/" and invalidate. The slash function will be handling either a get or a save to the cache. Secondly create a purge function

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if r.Method == http.MethodGet {
			getFromCache(w, r)
		} else if r.Method == http.MethodPost {
			saveToCache(w, r)
		}
	})

	http.HandleFunc("/invalidate", invalidateEntry)

	go http.ListenAndServeTLS(":5000", "cert.pem", "key.pem", nil)

	go purgeCache()

	log.Println("Caching service started, press <ENTER> to exit")
	fmt.Scanln()
}

Data

We create structure for the cache entry which consists of the data and the expiry date.

type cacheEntry struct {
	data       []byte
	expiration time.Time
}


Secondly an array for the cache, a mutex to ensure no issues with concurrency, a channel to support our purge function and a regex function to derive the max age.

var (
	cache  = make(map[string]*cacheEntry)
	mutex  = sync.RWMutex{}
	tickCh = time.Tick(5 * time.Second)
)

var maxAgeRexexp = regexp.MustCompile(`maxage=(\d+)`)

Get From Cache

This reads from the cache. I gets a lock to stop other functions deleting which reading, looks for the entry sending it back if found otherwise sending not found. Pretty simple.

func getFromCache(w http.ResponseWriter, r *http.Request) {
	mutex.RLock()
	defer mutex.RUnlock()

	key := r.URL.Query().Get("key")

	fmt.Printf("Searching cache for %s...", key)
	if entry, ok := cache[key]; ok {
		fmt.Println("found")
		w.Write(entry.data)
		return
	}

	w.WriteHeader(http.StatusNotFound)
	fmt.Println("not found")
}

Save to Cache

No surprises, lock to ensure on at a time. Get the max age and add the item to the cache with the calculated date.

func saveToCache(w http.ResponseWriter, r *http.Request) {
	mutex.Lock()
	defer mutex.Unlock()

	key := r.URL.Query().Get("key")
	cacheHeader := r.Header.Get("cache-control")

	fmt.Printf("Saving cache entry with key '%s' for %s seconds\n", key, cacheHeader)

	matches := maxAgeRexexp.FindStringSubmatch(cacheHeader)
	if len(matches) == 2 {
		dur, _ := strconv.Atoi(matches[1])
		data, _ := ioutil.ReadAll(r.Body)
		cache[key] = &cacheEntry{data: data, expiration: time.Now().Add(time.Duration(dur) * time.Second)}
	}
}

Invalidate Entry

Lock and delete the key, cannot remote if no key is an error but we get the picture.

func invalidateEntry(w http.ResponseWriter, r *http.Request) {
	mutex.Lock()
	defer mutex.Unlock()
	key := r.URL.Query().Get("key")
	fmt.Printf("purging entry with key '%s'\n", key)
	delete(cache, key)
}

Purge

A nice example of running a function every x seconds, no real surprises. Removes any cache entries which have expired.

func purgeCache() {
	for range tickCh {
		mutex.Lock()
		now := time.Now()

		fmt.Println("purging cache")

		for k, v := range cache {
			if now.Before(v.expiration) {
				fmt.Printf("purging entry with key '%s'\n", k)
				delete(cache, k)
			}
		}
		mutex.Unlock()
	}
}

App Server Implementation

Implement Calls to the Service

We need to implement calls to the service. First we create a flag to allow configuration to the service with

var cachServiceURL = flag.String("cacheservice", "https://172.18.0.13:5000", "Address of the caching service provider")

Get From Cache

We call the service with key provided and either return nil if not found or the cached response body

func getFromCache(key string) (io.ReadCloser, bool) {
	resp, err := http.Get(*cachServiceURL + "/?key=" + key)
	if err != nil || resp.StatusCode != http.StatusOK {
		println("get fail")
		return nil, false
	}
	return resp.Body, true
}

Save To Cache

We call the cache service to add an entry with duration passed.

func saveToCache(key string, duration int64, data []byte) {
	req, _ := http.NewRequest(http.MethodPost, *cachServiceURL+"/?key="+key,
		bytes.NewBuffer(data))
	req.Header.Add("cache-control", "maxage="+strconv.FormatInt(duration, 10))
	http.DefaultClient.Do(req)
}

Invalidate Cache

Very simply send a key

func invalidateCacheEntry(key string) {
	http.Get(*cachServiceURL + "/invalidate?key=" + key)
}

Implement In the Controller

We need to use these utility functions in the controller

  • showBlogPostList
  • showBlogPost
  • updateBlogPost

With each case we need to check if this exists in the cache.

Show Blog List

If we find it in the cache we return it,

	cacheKey := url.QueryEscape(r.URL.RequestURI())
	resp, ok := getFromCache(cacheKey)
	if ok {
		io.Copy(w, resp)
		resp.Close()
		return
	}


Else we build the post as we normally would and create a thread to add it to the cache.

...
	go saveToCache(cacheKey, int64(24*time.Hour), data[:])

Show Blog Post

Again check if it exists

	cacheKey := url.QueryEscape(r.URL.RequestURI())
	resp, ok := getFromCache(cacheKey)
	if ok {
		io.Copy(w, resp)
		resp.Close()
		return
	}


Else we build the post as we normally would and create a thread to add it to the cache.

...
	go saveToCache(cacheKey, int64(24*time.Hour), data[:])

Update Blog Post

We are updating so we now need to invalidate the original entry because we know it has changed

...
	go invalidateCacheEntry(url.QueryEscape(r.RequestURI))

Centralized Logging

Introduction

This is probably best done with a package but is here to just demonstrate an approach in go. It consists or a logging process and a helper function for the load balancer and app servers to share.

Logging Process

Log Entry Entity

This is the structure to contain the log entries

package entity

import "time"

type LogLevel string

const (
	LogLevelInfo  LogLevel = "INFO"
	LogLevelError LogLevel = "ERROR"
	LogLevelPanic LogLevel = "PANIC"
)

type LogEntry struct {
	Level     LogLevel
	Timestamp time.Time
	Source    string
	Message   string
}

This structure is used as a slice to store the log entries. We provide, swap, len and less to allow sorting on the slice.

type logEntries []entity.LogEntry

func (le logEntries) Len() int {
	return len(le)
}

func (le logEntries) Swap(i, j int) {
	le[i], le[j] = le[j], le[i]
}

func (le logEntries) Less(i, j int) bool {
	return le[i].Timestamp.Before(le[j].Timestamp)
}

Main Process

For the data and structures we have a mutex to ensure no issues with concurrency, a path for the log, a channel to run the processing in and a delay factor to ensuring slow entries have time to arrive.

Main Structures

var mutex sync.Mutex
var entries logEntries

const logPath = "/log/log.txt"

var tickCh = time.Tick(2 * time.Second)
var writeDelay = 2 * time.Second

Main Function

For the main function we start a web service, providing the handler and start the write log process

	http.HandleFunc("/", storeEntry)

	f, _ := os.Create(logPath)
	f.Close()

	go http.ListenAndServeTLS(":6000", "cert.pem", "key.pem", nil)

	go writeLog()

	log.Println("Log service started, press <ENTER> to exit")
	fmt.Scanln()

Main Server Handler

The handler decodes the json checking for errors, if no error the entry is appended to the slice.

func storeEntry(w http.ResponseWriter, r *http.Request) {
	dec := json.NewDecoder(r.Body)
	var entry entity.LogEntry
	err := dec.Decode(&entry)

	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	mutex.Lock()
	entries = append(entries, entry)
	mutex.Unlock()
}

Main Server Write An Entry

This just formats the entry for us

func writeEntry(entry entity.LogEntry) string {

	return fmt.Sprintf("%v;%v;%v;%v\n",
		entry.Timestamp.Format("2006-01-02 15:04:05"),
		entry.Level, entry.Source, entry.Message)
}

Main Server Write Log

This is the main function and runs forever everytume a tick is received.

  • File is opened
  • Timestamp is calculated
  • Entries are sorted
  • Processing entries until target date reached
func writeLog() {
	for range tickCh {
		mutex.Lock()

		logFile, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0664)
		if err != nil {
			fmt.Println(err)
			continue
		}
		targetTime := time.Now().Add(-writeDelay)
		sort.Sort(entries)
		for i, entry := range entries {
			if entry.Timestamp.Before(targetTime) {
				_, err := logFile.WriteString(writeEntry(entry))
				if err != nil {
					fmt.Println(err)
				}

				if i == len(entries)-1 {
					entries = logEntries{}
				}

			} else {
				entries = entries[i:]
				break
			}
		}

		logFile.Close()

		mutex.Unlock()
	}
}

Logging Helper

To ensure the approach is not duplicated we provide helper functions to use the logging service.I guess you could argue that this is a service interface + the WriteEntry.

var logserviceURL = flag.String("logservice", "https://172.18.0.14:5000",
	"Address of the logging service")

var tr = http.Transport{
	TLSClientConfig: &tls.Config{
		InsecureSkipVerify: true,
	},
}
var client = &http.Client{Transport: &tr}

func WriteEntry(entry *entity.LogEntry) {
	var buf bytes.Buffer
	enc := json.NewEncoder(&buf)

	enc.Encode(entry)
	req, _ := http.NewRequest(http.MethodPost, *logserviceURL, &buf)
	client.Do(req)
}

Logging Clients

So here is how to implement it, in this case in the load balancer.

	go loghelper.WriteEntry(&entity.LogEntry{
		Level:     entity.LogLevelInfo,
		Timestamp: time.Now(),
		Source:    "load balancer",
		Message:   "Registering application server with address: " + host,
	})