Scaling Go Applications: Difference between revisions

From bibbleWiki
Jump to navigation Jump to search
Line 348: Line 348:
*Expiration of data
*Expiration of data
*Invalidation of data
*Invalidation of data
==Caching Function==
===Main Function===
===Main Function===
As ever get the high level right. Lets start a service which supports two routes, slash "/" and invalidate. The slash function will be handling either a get or a save to the cache. Secondly create a purge function
As ever get the high level right. Lets start a service which supports two routes, slash "/" and invalidate. The slash function will be handling either a get or a save to the cache. Secondly create a purge function

Revision as of 12:56, 29 January 2021

Introduction

Resources

When we look at scaling we must consider the following resources.

  • Network Bandwidth
  • Processing Power
  • Available Memory
  • Data Storage

For this page we will be looking a the challenges around breaking an application in to several servers and following an architecture such as below This not meant to be THE solution but a walkthrough on some of the challenges to scale. This will be split into

  • Initial Optimizations
  • Creating a Load Balancer
  • Caching
  • Centralized Logging

AGAIN this is an approach and of course you can use third-party cloud solutions. This one will be confined to Docker and GO.

Initial Optimizations

Content Compression

One of the easiest things to do is to change the http messages to use compression in our messages.

Gzip Handler

For this we write a GzipHandler.

package util

import (
	"compress/gzip"
	"net/http"
	"strings"
)

type CloseableResponseWriter interface {
	http.ResponseWriter
	Close()
}

type gzipResponseWriter struct {
	http.ResponseWriter
	*gzip.Writer
}

func (w gzipResponseWriter) Write(data []byte) (int, error) {
	return w.Writer.Write(data)
}

func (w gzipResponseWriter) Close() {
	w.Writer.Close()
}

func (w gzipResponseWriter) Header() http.Header {
	return w.ResponseWriter.Header()
}

type closeableResponseWriter struct {
	http.ResponseWriter
}

func (w closeableResponseWriter) Close() {}

func GetResponseWriter(w http.ResponseWriter, req *http.Request) CloseableResponseWriter {
	if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
		w.Header().Set("Content-Encoding", "gzip")
		gRW := gzipResponseWriter{
			ResponseWriter: w,
			Writer:         gzip.NewWriter(w),
		}
		return gRW
	} else {
		return closeableResponseWriter{ResponseWriter: w}
	}
}

type GzipHandler struct{}

func (h *GzipHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	responseWriter := GetResponseWriter(w, r)
	defer responseWriter.Close()

	http.DefaultServeMux.ServeHTTP(responseWriter, r)
}

Implement on Server

To we can now use this within the server.

func main() {
...
    go http.ListenAndServe(":3000", new(util.GzipHandler))

Move to HTTP/2

This provides

  • Header compression
  • Reusable Connections
  • Server Push

To move to http/2 we need to run over https. Within go you can run this but I do mine manually

go run /usr/lib/go/src/crypto/tls/generate_cert.go -host localhost

Then to use you just need to change the ListenAndServ to ListenAndServTLS and add the cert and key. I did not have problems using the self signed certs but here is what some people have done.

package data

import (
	"crypto/tls"
	"flag"
	"net/http"
)

var dataServiceUrl = flag.String("dataservice", "https://localhost:4000", "Address of the data service provider")

func init() {
	tr := http.Transport{
		TLSClientConfig: &tls.Config{
			InsecureSkipVerify: true,
		},
	}

	http.DefaultClient = &http.Client{Transport: &tr}
}

Creating a Load Balancer

Introduction

To implement a load balancer we need

  • Load Balancer
  • Provider Registration

Load Balancer

This is a very simple load balancer to start. Each webrequest coming in contains it's request, response and a channel

type webRequest struct {
	r      *http.Request
	w      http.ResponseWriter
	doneCh chan struct{}
}


Two go routines are launched, one to process requests and one to listen for the requests

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		doneCh := make(chan struct{})
		requestCh <- &webRequest{r: r, w: w, doneCh: doneCh}
		<-doneCh
	})

	go processRequests()

	go http.ListenAndServeTLS(":2000", "cert.pem", "key.pem", nil)

	log.Println("Server started, press <ENTER> to exit")
	fmt.Scanln()
}


The process keeps a list of app servers, the current index and a client

var (
	appservers   = []string{}
	currentIndex = 0
	client       = http.Client{Transport: &transport}
)


The process requests listens for requests on the channel, increments the index to ensure the next server is used.

func processRequests() {
	for {
		select {
		case request := <-requestCh:
			println("request")
			if len(appservers) == 0 {
				request.w.WriteHeader(http.StatusInternalServerError)
				request.w.Write([]byte("No app servers found"))
				request.doneCh <- struct{}{}
				continue
			}
			currentIndex++
			if currentIndex == len(appservers) {
				currentIndex = 0
			}
			host := appservers[currentIndex]
			go processRequest(host, request)
		}
	}
}

Finally the requests are processed.

  • Incoming Request is copied to a new app server request
  • Request is processed
  • Response headers a copied back to original request
  • Incoming Request is marked as done
func processRequest(host string, request *webRequest) {
	hostURL, _ := url.Parse(request.r.URL.String())
	hostURL.Scheme = "https"
	hostURL.Host = host
	println(host)
	println(hostURL.String())
	req, _ := http.NewRequest(request.r.Method, hostURL.String(), request.r.Body)
	for k, v := range request.r.Header {
		values := ""
		for _, headerValue := range v {
			values += headerValue + " "
		}
		req.Header.Add(k, values)
	}

	resp, err := client.Do(req)

	if err != nil {
		request.w.WriteHeader(http.StatusInternalServerError)
		request.doneCh <- struct{}{}
		return
	}

	for k, v := range resp.Header {
		values := ""
		for _, headerValue := range v {
			values += headerValue + " "
		}
		request.w.Header().Add(k, values)
	}
	io.Copy(request.w, resp.Body)

	request.doneCh <- struct{}{}
}

Provider Registration

Provider registration consists of

  • Service Discovery
  • Service Termination
  • Heartbeat Checking

Service Discovery

This is how the service makes the load balancer aware of them. When we implement this we need to think about if they process all services or just some of the services.

Service Termination

This is where the service lets the load balancer know it is terminating. We need to consider the in flight transactions.

Heartbeat Checking

This is a poll to the load balancing to ensure the service is available

Load Balancer Implementation

For the load balancer the processing of registration, deregistration was down on a separate port so that this port was not not expose to the public.

Processing requests

In the main function a new listener is created and a new handler which handles the register and unregister.

	go http.ListenAndServeTLS(":2001", "cert.pem", "key.pem", new(appserverHandler))
...
type appserverHandler struct{}

func (h *appserverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	ip := strings.Split(r.RemoteAddr, ":")[0]
	port := r.URL.Query().Get("port")
	switch r.URL.Path {
	case "/register":
		registerCh <- ip + ":" + port
	case "/unregister":
		unregisterCh <- ip + ":" + port
	}
}

Within the existing processRequests function, the register, unregister and heartbeat functions are implemented.

Register

This checks if it already exists. If it does not then it is appended to the available servers.

		case host := <-registerCh:
			println("register " + host)
			isFound := false
			for _, h := range appservers {
				if host == h {
					isFound = true
					break
				}
			}

			if !isFound {
				appservers = append(appservers, host)
			}

Unregister

The unregister simply removes it from the list of strings

    case host := <-unregisterCh:
	println("unregister " + host)
	for i := len(appservers) - 1; i >= 0; i-- {
	    if appservers[i] == host {
		appservers = append(appservers[:i], appservers[i+1:]...)
	    }
	}

Heartbeat

The heartbeast is a go function to ensure the processing is not blocking the main thread. It simply issues a get to /ping and where no response is provided it sends a deregister.

	case <-heartbeartCh:
	    println("heartbeat")
	    servers := appservers[:]
	    go func(servers []string) {
		for _, server := range servers {
			resp, err := http.Get("https://" + server + "/ping")
			if err != nil || resp.StatusCode != 200 {
				unregisterCh <- server
			}
		}
	    }(servers)

App Server Implementation

Introduction

This is a poor way to do this and we should look at a way to improve this.

Register

Lets add a flag, which is a variable which can be override at runtime, with the load balancer and then after starting the server wait a second to ensure started aaarrrrgghhh and then send the register to the load balancer

var loadbalancerURL = flag.String("loadbalancer", "https://172.18.0.12:2001", "Address of the load balancer")
...
	time.Sleep(1 * time.Second)
	http.Get(*loadbalancerURL + "/register?port=3000")

Unregister

To unregister just call the unregister route after server ended

	http.Get(*loadbalancerURL + "/unregister?port=3000")

Heartbeat

The heartbeat simply responds with a 200.

	http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
	})

Problems

We probably want to look at how to stop pinging of servers which are in the process of being deregistered.

Caching

Introduction

Quite like the first thing to do is look at what is going on and understand the problem. If you don't you have nothing to compare with.
The original approach took 590s

Introducing the load balancer increased the transaction to 737ms but also increased the possibilities for capacity as each App Server will only impact the load balancer.

If we now introduce a cache we can reduce the 737ms down to 447 which will mean we are faster than the original. This is a great way to sell to the business but of course the question might be can you put a cache in the original solution sigh!

Approach

So the approach is to build a cache service which supports

  • Storing data
  • Expiration of data
  • Invalidation of data

Caching Function

Main Function

As ever get the high level right. Lets start a service which supports two routes, slash "/" and invalidate. The slash function will be handling either a get or a save to the cache. Secondly create a purge function

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		if r.Method == http.MethodGet {
			getFromCache(w, r)
		} else if r.Method == http.MethodPost {
			saveToCache(w, r)
		}
	})

	http.HandleFunc("/invalidate", invalidateEntry)

	go http.ListenAndServeTLS(":5000", "cert.pem", "key.pem", nil)

	go purgeCache()

	log.Println("Caching service started, press <ENTER> to exit")
	fmt.Scanln()
}

Data

We create structure for the cache entry which consists of the data and the expiry date.

type cacheEntry struct {
	data       []byte
	expiration time.Time
}


Secondly an array for the cache, a mutex to ensure no issues with concurrency, a channel to support our purge function and a regex function to derive the max age.

var (
	cache  = make(map[string]*cacheEntry)
	mutex  = sync.RWMutex{}
	tickCh = time.Tick(5 * time.Second)
)

var maxAgeRexexp = regexp.MustCompile(`maxage=(\d+)`)

Get From Cache

This reads from the cache. I gets a lock to stop other functions deleting which reading, looks for the entry sending it back if found otherwise sending not found. Pretty simple.

func getFromCache(w http.ResponseWriter, r *http.Request) {
	mutex.RLock()
	defer mutex.RUnlock()

	key := r.URL.Query().Get("key")

	fmt.Printf("Searching cache for %s...", key)
	if entry, ok := cache[key]; ok {
		fmt.Println("found")
		w.Write(entry.data)
		return
	}

	w.WriteHeader(http.StatusNotFound)
	fmt.Println("not found")
}

Save to Cache

No surprises, lock to ensure on at a time. Get the max age and add the item to the cache with the calculated date.

func saveToCache(w http.ResponseWriter, r *http.Request) {
	mutex.Lock()
	defer mutex.Unlock()

	key := r.URL.Query().Get("key")
	cacheHeader := r.Header.Get("cache-control")

	fmt.Printf("Saving cache entry with key '%s' for %s seconds\n", key, cacheHeader)

	matches := maxAgeRexexp.FindStringSubmatch(cacheHeader)
	if len(matches) == 2 {
		dur, _ := strconv.Atoi(matches[1])
		data, _ := ioutil.ReadAll(r.Body)
		cache[key] = &cacheEntry{data: data, expiration: time.Now().Add(time.Duration(dur) * time.Second)}
	}
}

Invalidate Entry

Lock and delete the key, cannot remote if no key is an error but we get the picture.

func invalidateEntry(w http.ResponseWriter, r *http.Request) {
	mutex.Lock()
	defer mutex.Unlock()
	key := r.URL.Query().Get("key")
	fmt.Printf("purging entry with key '%s'\n", key)
	delete(cache, key)
}

Purge

A nice example of running a function every x seconds, no real surprises. Removes any cache entries which have expired.

func purgeCache() {
	for range tickCh {
		mutex.Lock()
		now := time.Now()

		fmt.Println("purging cache")

		for k, v := range cache {
			if now.Before(v.expiration) {
				fmt.Printf("purging entry with key '%s'\n", k)
				delete(cache, k)
			}
		}
		mutex.Unlock()
	}
}

Centralized Logging