Scaling Go Applications: Difference between revisions
(13 intermediate revisions by the same user not shown) | |||
Line 549: | Line 549: | ||
===Log Entry Entity=== | ===Log Entry Entity=== | ||
This is the structure to contain the log entries | This is the structure to contain the log entries | ||
<syntaxhighlight lang=" | <syntaxhighlight lang="go"> | ||
package entity | package entity | ||
Line 568: | Line 568: | ||
Message string | Message string | ||
} | } | ||
<syntaxhighlight | </syntaxhighlight> | ||
This structure is used as a slice to store the log entries. We provide, swap, len and less to allow sorting on the slice. | This structure is used as a slice to store the log entries. We provide, swap, len and less to allow sorting on the slice. | ||
<syntaxhighlight lang="go"> | |||
<syntaxhighlight lang=" | |||
type logEntries []entity.LogEntry | type logEntries []entity.LogEntry | ||
Line 589: | Line 586: | ||
} | } | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==Main | |||
===Main Process=== | |||
For the data and structures we have a mutex to ensure no issues with concurrency, a path for the log, a channel to run the processing in and a delay factor to ensuring slow entries have time to arrive. | For the data and structures we have a mutex to ensure no issues with concurrency, a path for the log, a channel to run the processing in and a delay factor to ensuring slow entries have time to arrive. | ||
===Main Structures=== | |||
<syntaxhighlight lang=" | ====Main Structures==== | ||
<syntaxhighlight lang="go"> | |||
var mutex sync.Mutex | var mutex sync.Mutex | ||
var entries logEntries | var entries logEntries | ||
Line 600: | Line 599: | ||
var tickCh = time.Tick(2 * time.Second) | var tickCh = time.Tick(2 * time.Second) | ||
var writeDelay = 2 * time.Second | var writeDelay = 2 * time.Second | ||
</syntaxhighlight> | |||
====Main Function==== | |||
For the main function we start a web service, providing the handler and start the write log process | |||
<syntaxhighlight lang="go"> | |||
http.HandleFunc("/", storeEntry) | |||
f, _ := os.Create(logPath) | |||
f.Close() | |||
go http.ListenAndServeTLS(":6000", "cert.pem", "key.pem", nil) | |||
go writeLog() | |||
log.Println("Log service started, press <ENTER> to exit") | |||
fmt.Scanln() | |||
</syntaxhighlight> | |||
====Main Server Handler==== | |||
The handler decodes the json checking for errors, if no error the entry is appended to the slice. | |||
<syntaxhighlight lang="go"> | |||
func storeEntry(w http.ResponseWriter, r *http.Request) { | |||
dec := json.NewDecoder(r.Body) | |||
var entry entity.LogEntry | |||
err := dec.Decode(&entry) | |||
if err != nil { | |||
w.WriteHeader(http.StatusInternalServerError) | |||
return | |||
} | |||
mutex.Lock() | |||
entries = append(entries, entry) | |||
mutex.Unlock() | |||
} | |||
</syntaxhighlight> | |||
====Main Server Write An Entry==== | |||
This just formats the entry for us | |||
<syntaxhighlight lang="go"> | |||
func writeEntry(entry entity.LogEntry) string { | |||
return fmt.Sprintf("%v;%v;%v;%v\n", | |||
entry.Timestamp.Format("2006-01-02 15:04:05"), | |||
entry.Level, entry.Source, entry.Message) | |||
} | |||
</syntaxhighlight> | |||
====Main Server Write Log==== | |||
This is the main function and runs forever everytume a tick is received. | |||
*File is opened | |||
*Timestamp is calculated | |||
*Entries are sorted | |||
*Processing entries until target date reached | |||
<syntaxhighlight lang="go"> | |||
func writeLog() { | |||
for range tickCh { | |||
mutex.Lock() | |||
logFile, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0664) | |||
if err != nil { | |||
fmt.Println(err) | |||
continue | |||
} | |||
targetTime := time.Now().Add(-writeDelay) | |||
sort.Sort(entries) | |||
for i, entry := range entries { | |||
if entry.Timestamp.Before(targetTime) { | |||
_, err := logFile.WriteString(writeEntry(entry)) | |||
if err != nil { | |||
fmt.Println(err) | |||
} | |||
if i == len(entries)-1 { | |||
entries = logEntries{} | |||
} | |||
} else { | |||
entries = entries[i:] | |||
break | |||
} | |||
} | |||
logFile.Close() | |||
mutex.Unlock() | |||
} | |||
} | |||
</syntaxhighlight> | |||
===Logging Helper=== | |||
To ensure the approach is not duplicated we provide helper functions to use the logging service.I guess you could argue that this is a service interface + the WriteEntry. | |||
<syntaxhighlight lang="go"> | |||
var logserviceURL = flag.String("logservice", "https://172.18.0.14:5000", | |||
"Address of the logging service") | |||
var tr = http.Transport{ | |||
TLSClientConfig: &tls.Config{ | |||
InsecureSkipVerify: true, | |||
}, | |||
} | |||
var client = &http.Client{Transport: &tr} | |||
func WriteEntry(entry *entity.LogEntry) { | |||
var buf bytes.Buffer | |||
enc := json.NewEncoder(&buf) | |||
enc.Encode(entry) | |||
req, _ := http.NewRequest(http.MethodPost, *logserviceURL, &buf) | |||
client.Do(req) | |||
} | |||
</syntaxhighlight> | |||
===Logging Clients=== | |||
So here is how to implement it, in this case in the load balancer. | |||
<syntaxhighlight lang="go"> | |||
go loghelper.WriteEntry(&entity.LogEntry{ | |||
Level: entity.LogLevelInfo, | |||
Timestamp: time.Now(), | |||
Source: "load balancer", | |||
Message: "Registering application server with address: " + host, | |||
}) | |||
</syntaxhighlight> | </syntaxhighlight> |
Latest revision as of 01:13, 30 January 2021
Introduction
Resources
When we look at scaling we must consider the following resources.
- Network Bandwidth
- Processing Power
- Available Memory
- Data Storage
For this page we will be looking a the challenges around breaking an application in to several servers and following an architecture such as below
This not meant to be THE solution but a walkthrough on some of the challenges to scale. This will be split into
- Initial Optimizations
- Creating a Load Balancer
- Caching
- Centralized Logging
AGAIN this is an approach and of course you can use third-party cloud solutions. This one will be confined to Docker and GO.
Initial Optimizations
Content Compression
One of the easiest things to do is to change the http messages to use compression in our messages.
Gzip Handler
For this we write a GzipHandler.
package util
import (
"compress/gzip"
"net/http"
"strings"
)
type CloseableResponseWriter interface {
http.ResponseWriter
Close()
}
type gzipResponseWriter struct {
http.ResponseWriter
*gzip.Writer
}
func (w gzipResponseWriter) Write(data []byte) (int, error) {
return w.Writer.Write(data)
}
func (w gzipResponseWriter) Close() {
w.Writer.Close()
}
func (w gzipResponseWriter) Header() http.Header {
return w.ResponseWriter.Header()
}
type closeableResponseWriter struct {
http.ResponseWriter
}
func (w closeableResponseWriter) Close() {}
func GetResponseWriter(w http.ResponseWriter, req *http.Request) CloseableResponseWriter {
if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
gRW := gzipResponseWriter{
ResponseWriter: w,
Writer: gzip.NewWriter(w),
}
return gRW
} else {
return closeableResponseWriter{ResponseWriter: w}
}
}
type GzipHandler struct{}
func (h *GzipHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
responseWriter := GetResponseWriter(w, r)
defer responseWriter.Close()
http.DefaultServeMux.ServeHTTP(responseWriter, r)
}
Implement on Server
To we can now use this within the server.
func main() {
...
go http.ListenAndServe(":3000", new(util.GzipHandler))
Move to HTTP/2
This provides
- Header compression
- Reusable Connections
- Server Push
To move to http/2 we need to run over https. Within go you can run this but I do mine manually
go run /usr/lib/go/src/crypto/tls/generate_cert.go -host localhost
Then to use you just need to change the ListenAndServ to ListenAndServTLS and add the cert and key. I did not have problems using the self signed certs but here is what some people have done.
package data
import (
"crypto/tls"
"flag"
"net/http"
)
var dataServiceUrl = flag.String("dataservice", "https://localhost:4000", "Address of the data service provider")
func init() {
tr := http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
http.DefaultClient = &http.Client{Transport: &tr}
}
Creating a Load Balancer
Introduction
To implement a load balancer we need
- Load Balancer
- Provider Registration
Load Balancer
This is a very simple load balancer to start. Each webrequest coming in contains it's request, response and a channel
type webRequest struct {
r *http.Request
w http.ResponseWriter
doneCh chan struct{}
}
Two go routines are launched, one to process requests and one to listen for the requests
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
doneCh := make(chan struct{})
requestCh <- &webRequest{r: r, w: w, doneCh: doneCh}
<-doneCh
})
go processRequests()
go http.ListenAndServeTLS(":2000", "cert.pem", "key.pem", nil)
log.Println("Server started, press <ENTER> to exit")
fmt.Scanln()
}
The process keeps a list of app servers, the current index and a client
var (
appservers = []string{}
currentIndex = 0
client = http.Client{Transport: &transport}
)
The process requests listens for requests on the channel, increments the index to ensure the next server is used.
func processRequests() {
for {
select {
case request := <-requestCh:
println("request")
if len(appservers) == 0 {
request.w.WriteHeader(http.StatusInternalServerError)
request.w.Write([]byte("No app servers found"))
request.doneCh <- struct{}{}
continue
}
currentIndex++
if currentIndex == len(appservers) {
currentIndex = 0
}
host := appservers[currentIndex]
go processRequest(host, request)
}
}
}
Finally the requests are processed.
- Incoming Request is copied to a new app server request
- Request is processed
- Response headers a copied back to original request
- Incoming Request is marked as done
func processRequest(host string, request *webRequest) {
hostURL, _ := url.Parse(request.r.URL.String())
hostURL.Scheme = "https"
hostURL.Host = host
println(host)
println(hostURL.String())
req, _ := http.NewRequest(request.r.Method, hostURL.String(), request.r.Body)
for k, v := range request.r.Header {
values := ""
for _, headerValue := range v {
values += headerValue + " "
}
req.Header.Add(k, values)
}
resp, err := client.Do(req)
if err != nil {
request.w.WriteHeader(http.StatusInternalServerError)
request.doneCh <- struct{}{}
return
}
for k, v := range resp.Header {
values := ""
for _, headerValue := range v {
values += headerValue + " "
}
request.w.Header().Add(k, values)
}
io.Copy(request.w, resp.Body)
request.doneCh <- struct{}{}
}
Provider Registration
Provider registration consists of
- Service Discovery
- Service Termination
- Heartbeat Checking
Service Discovery
This is how the service makes the load balancer aware of them. When we implement this we need to think about if they process all services or just some of the services.
Service Termination
This is where the service lets the load balancer know it is terminating. We need to consider the in flight transactions.
Heartbeat Checking
This is a poll to the load balancing to ensure the service is available
Load Balancer Implementation
For the load balancer the processing of registration, deregistration was down on a separate port so that this port was not not expose to the public.
Processing requests
In the main function a new listener is created and a new handler which handles the register and unregister.
go http.ListenAndServeTLS(":2001", "cert.pem", "key.pem", new(appserverHandler))
...
type appserverHandler struct{}
func (h *appserverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ip := strings.Split(r.RemoteAddr, ":")[0]
port := r.URL.Query().Get("port")
switch r.URL.Path {
case "/register":
registerCh <- ip + ":" + port
case "/unregister":
unregisterCh <- ip + ":" + port
}
}
Within the existing processRequests function, the register, unregister and heartbeat functions are implemented.
Register
This checks if it already exists. If it does not then it is appended to the available servers.
case host := <-registerCh:
println("register " + host)
isFound := false
for _, h := range appservers {
if host == h {
isFound = true
break
}
}
if !isFound {
appservers = append(appservers, host)
}
Unregister
The unregister simply removes it from the list of strings
case host := <-unregisterCh:
println("unregister " + host)
for i := len(appservers) - 1; i >= 0; i-- {
if appservers[i] == host {
appservers = append(appservers[:i], appservers[i+1:]...)
}
}
Heartbeat
The heartbeast is a go function to ensure the processing is not blocking the main thread. It simply issues a get to /ping and where no response is provided it sends a deregister.
case <-heartbeartCh:
println("heartbeat")
servers := appservers[:]
go func(servers []string) {
for _, server := range servers {
resp, err := http.Get("https://" + server + "/ping")
if err != nil || resp.StatusCode != 200 {
unregisterCh <- server
}
}
}(servers)
App Server Implementation
Introduction
This is a poor way to do this and we should look at a way to improve this.
Register
Lets add a flag, which is a variable which can be override at runtime, with the load balancer and then after starting the server wait a second to ensure started aaarrrrgghhh and then send the register to the load balancer
var loadbalancerURL = flag.String("loadbalancer", "https://172.18.0.12:2001", "Address of the load balancer")
...
time.Sleep(1 * time.Second)
http.Get(*loadbalancerURL + "/register?port=3000")
Unregister
To unregister just call the unregister route after server ended
http.Get(*loadbalancerURL + "/unregister?port=3000")
Heartbeat
The heartbeat simply responds with a 200.
http.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
Problems
We probably want to look at how to stop pinging of servers which are in the process of being deregistered.
Caching
Introduction
Quite like the first thing to do is look at what is going on and understand the problem. If you don't you have nothing to compare with.
The original approach took 590s
Introducing the load balancer increased the transaction to 737ms but also increased the possibilities for capacity as each App Server will only impact the load balancer.
If we now introduce a cache we can reduce the 737ms down to 447 which will mean we are faster than the original. This is a great way to sell to the business but of course the question might be can you put a cache in the original solution sigh!
Approach
So the approach is to build a cache service which supports
- Storing data
- Expiration of data
- Invalidation of data
Caching Service Implementation
Main Function
As ever get the high level right. Lets start a service which supports two routes, slash "/" and invalidate. The slash function will be handling either a get or a save to the cache. Secondly create a purge function
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
getFromCache(w, r)
} else if r.Method == http.MethodPost {
saveToCache(w, r)
}
})
http.HandleFunc("/invalidate", invalidateEntry)
go http.ListenAndServeTLS(":5000", "cert.pem", "key.pem", nil)
go purgeCache()
log.Println("Caching service started, press <ENTER> to exit")
fmt.Scanln()
}
Data
We create structure for the cache entry which consists of the data and the expiry date.
type cacheEntry struct {
data []byte
expiration time.Time
}
Secondly an array for the cache, a mutex to ensure no issues with concurrency, a channel to support our purge function and a regex function to derive the max age.
var (
cache = make(map[string]*cacheEntry)
mutex = sync.RWMutex{}
tickCh = time.Tick(5 * time.Second)
)
var maxAgeRexexp = regexp.MustCompile(`maxage=(\d+)`)
Get From Cache
This reads from the cache. I gets a lock to stop other functions deleting which reading, looks for the entry sending it back if found otherwise sending not found. Pretty simple.
func getFromCache(w http.ResponseWriter, r *http.Request) {
mutex.RLock()
defer mutex.RUnlock()
key := r.URL.Query().Get("key")
fmt.Printf("Searching cache for %s...", key)
if entry, ok := cache[key]; ok {
fmt.Println("found")
w.Write(entry.data)
return
}
w.WriteHeader(http.StatusNotFound)
fmt.Println("not found")
}
Save to Cache
No surprises, lock to ensure on at a time. Get the max age and add the item to the cache with the calculated date.
func saveToCache(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
key := r.URL.Query().Get("key")
cacheHeader := r.Header.Get("cache-control")
fmt.Printf("Saving cache entry with key '%s' for %s seconds\n", key, cacheHeader)
matches := maxAgeRexexp.FindStringSubmatch(cacheHeader)
if len(matches) == 2 {
dur, _ := strconv.Atoi(matches[1])
data, _ := ioutil.ReadAll(r.Body)
cache[key] = &cacheEntry{data: data, expiration: time.Now().Add(time.Duration(dur) * time.Second)}
}
}
Invalidate Entry
Lock and delete the key, cannot remote if no key is an error but we get the picture.
func invalidateEntry(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
key := r.URL.Query().Get("key")
fmt.Printf("purging entry with key '%s'\n", key)
delete(cache, key)
}
Purge
A nice example of running a function every x seconds, no real surprises. Removes any cache entries which have expired.
func purgeCache() {
for range tickCh {
mutex.Lock()
now := time.Now()
fmt.Println("purging cache")
for k, v := range cache {
if now.Before(v.expiration) {
fmt.Printf("purging entry with key '%s'\n", k)
delete(cache, k)
}
}
mutex.Unlock()
}
}
App Server Implementation
Implement Calls to the Service
We need to implement calls to the service. First we create a flag to allow configuration to the service with
var cachServiceURL = flag.String("cacheservice", "https://172.18.0.13:5000", "Address of the caching service provider")
Get From Cache
We call the service with key provided and either return nil if not found or the cached response body
func getFromCache(key string) (io.ReadCloser, bool) {
resp, err := http.Get(*cachServiceURL + "/?key=" + key)
if err != nil || resp.StatusCode != http.StatusOK {
println("get fail")
return nil, false
}
return resp.Body, true
}
Save To Cache
We call the cache service to add an entry with duration passed.
func saveToCache(key string, duration int64, data []byte) {
req, _ := http.NewRequest(http.MethodPost, *cachServiceURL+"/?key="+key,
bytes.NewBuffer(data))
req.Header.Add("cache-control", "maxage="+strconv.FormatInt(duration, 10))
http.DefaultClient.Do(req)
}
Invalidate Cache
Very simply send a key
func invalidateCacheEntry(key string) {
http.Get(*cachServiceURL + "/invalidate?key=" + key)
}
Implement In the Controller
We need to use these utility functions in the controller
- showBlogPostList
- showBlogPost
- updateBlogPost
With each case we need to check if this exists in the cache.
Show Blog List
If we find it in the cache we return it,
cacheKey := url.QueryEscape(r.URL.RequestURI())
resp, ok := getFromCache(cacheKey)
if ok {
io.Copy(w, resp)
resp.Close()
return
}
Else we build the post as we normally would and create a thread to add it to the cache.
...
go saveToCache(cacheKey, int64(24*time.Hour), data[:])
Show Blog Post
Again check if it exists
cacheKey := url.QueryEscape(r.URL.RequestURI())
resp, ok := getFromCache(cacheKey)
if ok {
io.Copy(w, resp)
resp.Close()
return
}
Else we build the post as we normally would and create a thread to add it to the cache.
...
go saveToCache(cacheKey, int64(24*time.Hour), data[:])
Update Blog Post
We are updating so we now need to invalidate the original entry because we know it has changed
...
go invalidateCacheEntry(url.QueryEscape(r.RequestURI))
Centralized Logging
Introduction
This is probably best done with a package but is here to just demonstrate an approach in go. It consists or a logging process and a helper function for the load balancer and app servers to share.
Logging Process
Log Entry Entity
This is the structure to contain the log entries
package entity
import "time"
type LogLevel string
const (
LogLevelInfo LogLevel = "INFO"
LogLevelError LogLevel = "ERROR"
LogLevelPanic LogLevel = "PANIC"
)
type LogEntry struct {
Level LogLevel
Timestamp time.Time
Source string
Message string
}
This structure is used as a slice to store the log entries. We provide, swap, len and less to allow sorting on the slice.
type logEntries []entity.LogEntry
func (le logEntries) Len() int {
return len(le)
}
func (le logEntries) Swap(i, j int) {
le[i], le[j] = le[j], le[i]
}
func (le logEntries) Less(i, j int) bool {
return le[i].Timestamp.Before(le[j].Timestamp)
}
Main Process
For the data and structures we have a mutex to ensure no issues with concurrency, a path for the log, a channel to run the processing in and a delay factor to ensuring slow entries have time to arrive.
Main Structures
var mutex sync.Mutex
var entries logEntries
const logPath = "/log/log.txt"
var tickCh = time.Tick(2 * time.Second)
var writeDelay = 2 * time.Second
Main Function
For the main function we start a web service, providing the handler and start the write log process
http.HandleFunc("/", storeEntry)
f, _ := os.Create(logPath)
f.Close()
go http.ListenAndServeTLS(":6000", "cert.pem", "key.pem", nil)
go writeLog()
log.Println("Log service started, press <ENTER> to exit")
fmt.Scanln()
Main Server Handler
The handler decodes the json checking for errors, if no error the entry is appended to the slice.
func storeEntry(w http.ResponseWriter, r *http.Request) {
dec := json.NewDecoder(r.Body)
var entry entity.LogEntry
err := dec.Decode(&entry)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
mutex.Lock()
entries = append(entries, entry)
mutex.Unlock()
}
Main Server Write An Entry
This just formats the entry for us
func writeEntry(entry entity.LogEntry) string {
return fmt.Sprintf("%v;%v;%v;%v\n",
entry.Timestamp.Format("2006-01-02 15:04:05"),
entry.Level, entry.Source, entry.Message)
}
Main Server Write Log
This is the main function and runs forever everytume a tick is received.
- File is opened
- Timestamp is calculated
- Entries are sorted
- Processing entries until target date reached
func writeLog() {
for range tickCh {
mutex.Lock()
logFile, err := os.OpenFile(logPath, os.O_APPEND|os.O_WRONLY, 0664)
if err != nil {
fmt.Println(err)
continue
}
targetTime := time.Now().Add(-writeDelay)
sort.Sort(entries)
for i, entry := range entries {
if entry.Timestamp.Before(targetTime) {
_, err := logFile.WriteString(writeEntry(entry))
if err != nil {
fmt.Println(err)
}
if i == len(entries)-1 {
entries = logEntries{}
}
} else {
entries = entries[i:]
break
}
}
logFile.Close()
mutex.Unlock()
}
}
Logging Helper
To ensure the approach is not duplicated we provide helper functions to use the logging service.I guess you could argue that this is a service interface + the WriteEntry.
var logserviceURL = flag.String("logservice", "https://172.18.0.14:5000",
"Address of the logging service")
var tr = http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
var client = &http.Client{Transport: &tr}
func WriteEntry(entry *entity.LogEntry) {
var buf bytes.Buffer
enc := json.NewEncoder(&buf)
enc.Encode(entry)
req, _ := http.NewRequest(http.MethodPost, *logserviceURL, &buf)
client.Do(req)
}
Logging Clients
So here is how to implement it, in this case in the load balancer.
go loghelper.WriteEntry(&entity.LogEntry{
Level: entity.LogLevelInfo,
Timestamp: time.Now(),
Source: "load balancer",
Message: "Registering application server with address: " + host,
})