Chapter 4 - Cloud Native Patterns
The Context Package
context
was introduced in Go 1.7, it contains a single interface, context.Context
type Context interface {
// returns a channel that's closed when this context is cancelled
Done() <-chan struct{}
// indicates why this context was cancelled after the Done channel is closed.
// If Done is not yet closed, Err returns nil.
Err() error
// returns the time when this context should be cancelled;
// it returns ok == false if no dealine is set.
Dealine() (dealine time.Time, ok bool)
// returns the value associated with this context for key,
// or nil if no value is associated with key. Use with care. (see WithValue)
Value(key interface{}) interface{}
}
creating context
func Background() Context
empty context, never cancelled, has no value, and has no deadline
func context.TODO() Context
also empty context, but it's intended to be used as a placeholder when it's unclear which context to use or it is not yet available.
defining context deadlines and timeouts
func WithDeadline(Context, time.Time) (Context, CancelFunc)
accepts a specific time at which the context will be cancelled and Done
will be closed.
func WithTimeout(Context, time.Duration) (Context, CancelFunc)
accepts a duration after which the context will be cancelled and Done
will be closed.
func WithCancel(Context) (Context, CancelFunc)
accepts nothing, return a function that can be called to cancel the context
(when a context is cancelled, all contexts that are derived from it are also cancelled. contexts that it was derived from are not
defining request-scoped values
func WithValue(parent Context, key, val interface{}) Context
returns a derivation of parent in which key
is associated with the value val
use with caution, check Context is for cancelation
using a context
func Stream(ctx context.Context, out chan<- Value) error {
// create a derived context with a 10 seconds timeout and passes it to
// ServiceCall, it will be cancelled upon timeout, but ctx will not.
dctx := context.WithTimeout(ctx, time.Second * 10)
res, err := ServiceCall(dctx)
if err != nil {
return err
}
for {
select {
case out <- res:
case <-ctx.Done():
return ctx.Err()
}
}
}
Stability Patterns
Circuit Breaker
Participants:
- Circuit: the function that interacts with the service.
- Breaker: a closure with the same function signature as Circuit
Implementation:
type Circuit func(contxt.Context) (string, error)
func Breaker(circuit Circuit, failureThreshold uint64) Circuit {
var lastStateSuccessful = true
var consecutiveFailures uint64 = 0
var lastAttampt time.Time = time.Now()
return func(ctx context.Context) (string, error) {
if consecutiveFailures >= failureThreshold {
backoffLevel := consecutiveFailures - failureThreshold
shouldRetryAt := lastAttampt.Add(time.Second * 2 << backoffLevel)
if !time.Now().After(shouldRetryAt) {
return "", errors.New("circuit open -- service unreachable")
}
}
lastAttampt = time.Now()
response, err := circuit(ctx)
if err != nil {
if !lastStateSuccessful {
consecutiveFailures++
}
lastStateSuccessful = false
return response, err
}
lastStateSuccessful = true
consecutiveFailures = 0
return response, nil
}
}
Debounce
Participants:
- Circuit: the function to regulate.
- Debounce: a closure with the same function signature as Circuit
Implementation:
type Circuit func(contxt.Context) (string, error)
func DebounceLast(circuit Circuit, d time.Duration) Circuit {
var threshold time.Time = time.Now()
var ticker *time.Ticker
var result string
var err error
return func(ctx context.Context) (string, error) {
threshold = time.Now().Add(d)
if ticker == nil {
ticker = time.NewTicker(time.Millisecond * 100)
tickerc := ticker.C
go func() {
defer ticker.Stop()
for {
select {
case <-tickerc:
if threshold.Before(time.Now()) {
result, err = circuit(ctx)
ticker.Stop()
ticker = nil
break
}
case <-ctx.Done():
result, err = "", ctx.Err()
break
}
}
}()
}
return result, err
}
}
Retry
Participants:
- Effector: the function that interacts with the service.
- Retry: a function that accepts Effector and returns a closure with the same function signature as Effector.
Implementation:
type Effector func(context.Context) (string, error)
func Retry(effector Effector, retries int, delay time.Duration) Effector {
return func(ctx context.Context) (string, error) {
for r := 0; ; r++ {
response, err := effector(ctx)
if err == nil || r >= retries {
return response, err
}
log.Printf("Attempt %d failed; retrying in %v", r + 1, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
return "", ctx.Err()
}
}
}
}
to use this:
var count int
func EmulateTransientError(ctx context.Context) (string, error) {
count++
if count <= 3 {
return "intenntial fail", error.New("error")
} else {
return "success", nil
}
}
func main() {
r := Retry(EmulateTransientError, 5, 2*time.Second)
res, err := r(context.Background())
fmt.Println(res, err)
}
Throttle
Participants:
- Effector: the function to regulate.
- Throttle: a function that accepts Effector and returns a closure with the same function signature as Effector.
Implementation:
type Effector func(context.Context) (string, error)
func Throttle(e Effector, max uint, refill uint, d time.Duration) Effector {
var ticker *time.Ticker = nil
var tokens uint = max
var lastReturnString string
var lastReturnError error
return func(ctx context.Context) (string, error) {
if ctx.Err() != nil {
return "", ctx.Err()
}
if ticker == nil {
ticker = time.NewTicker(d)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
t := tokens + refill
if t > max {
t = max
}
tokens = t
case <-ctx.Done():
ticker.Stop()
break
}
}
}()
}
if tokens > 0 {
tokens--
lastReturnString, lastReturnError = e(ctx)
}
return lastReturnString, lastReturnError
}
}
Timeout
Participants:
- Client: the client who wants to execute SlowFunction.
- SlowFunction: the long-running function that implements the functionality desired by Client.
- Timeout: a wrapper function around SlowFunction that implements the timeout logic.
Implementation:
In an ideal world:
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()
result, err := SomeFunction(ctxt)
but 3rd party function doesn't accept a Context
value, create a Timeout
function for it:
import {
rando "gitplace.io/random-repo/random-package"
}
func Timeout(arg string) func(context.Context) (string, error) {
chres := make(chan string)
cherr := make(chan error)
go func() {
res, err := rando.SlowFunction(arg)
chres <- res
cheney <- err
}()
return func(ctx context.Context) (string, error) {
select {
case res := <-chres:
return res, <-check
case <-ctx.Done():
return "", ctx.Err()
}
}
}
usage
func main() {
ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 2 * time.Second)
defer cancel()
timeout := Timeout("some input value")
res, err := timeout(ctxt)
fmt.Println(res, err)
}
finally, although it's usually preferred to implement service timeouts using context.Context
, channel timeouts can also be implemented using the time.After
function.
Concurrency Patterns
Fan-In
Participants:
- Sources: a set of one of more input channels with the same type. Accepted by Funnel.
- Destination: an output channel of the same type as Sources. created and provided by Funnel.
- Funnel: accepts Sources and immediately returns Destination. any input from any Sources will be output by Destination.
Implementation:
func Funnel(sources ...<-chan int) <-chan int {
dest := make(chan int)
var wg sync.WaitGroup
wg.Add(len(sources))
for _, ch := range sources {
go func(c <-chan int) {
defer wg.Done()
for n := range c {
dest <- n
}
}(ch)
}
// start a goroutine to close dest after all sources close
go func() {
wg.Wait()
close(dest)
}()
return dest
}
usage
func main() {
sources := make([]<-chan int, 0)
for i := 0; i < 3; i++ {
ch := make(chan int)
sources = append(sources, ch)
go func() {
defer close(ch)
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(time.Second)
}
}()
}
dest := Funnel(sources...)
for d := range dest {
fmt.Println(d)
}
fmt.Println("Done")
}
Fan-Out
Participants:
- Source: an input channel. accepted by Split.
- Destinations: an output channel of the same type as Source. created and provided by Split.
- Split: a function that accepts Source and immediately returns Destinations. any input from Source will be output to a Destinations.
Implementation:
func Split(source <-chan int, n int) []<-chan int {
dests := make([]<-chan int, 0)
for i := 0; i < n; i++ {
ch := make(chan int)
dests = append(dests, ch)
go func() {
defer close(ch)
for val := range source {
ch <- val
}
}()
}
return dests
}
usage:
func main() {
source := make(chan int)
dests := Split(source, 5)
go func() {
for i := 1; i <= 10; i++ {
source <- i
}
close(source)
}()
var wg sync.WaitGroup
wg.Add(len(dests))
for i, ch := range(dests) {
go func(i int, d <-chan int) {
defer wg.Done()
for val := range d {
fmt.Println("#%d got %d\n", i, val)
}
}(i, ch)
}
wg.Wait()
}
Future
Participants:
- Future: the interface that is received by the consumer to retrieve the eventual result.
- SlowFunction: a wrapper function around some function to be asynchronously executed; provides Future.
- InnerFuture: satisfies the Future interface; includes an attached method that contains the result access logic.
Implementation:
this pattern isn't used as frequently in Go as in some other languages because channels can be often used in a similar way. For example:
func InverseProduct(a, b Matrix) Matrix {
inva := make(chan Matrix)
invb := make(chan Matrix)
go func() { inva <- Inverse(a) }()
go func() { invb <- Inverse(b) }()
return Product(<-inva, <-invb)
}
func Inverse(m Matrix) <-chan Matrix {
out := make(chan Matrix)
go func() {
out <- BlockingInverse(m)
close(out)
}()
}
type Future interface {
Result() (string, error)
}
type InnerFuture struct {
once sync.Once
wg sync.WaitGroup
res string
err error
resCh <-chan string
errCh <-chan error
}
func (f *InnerFuture) Result() {string, error) {
f.once.Do(func() {
f.wg.Add(1)
defer f.wg.Done()
f.res = <-f.resCh
f.err = <-f.errCh
})
f.wg.Wait()
return f.res, f.err
}
func SlowFunction(ctx context.Context) Future {
resCh := make(chan string)
errCh := make(chan error)
go func() {
select {
case <-time.After(time.Second * 2):
resCh <- "I slept for 2 seconds"
errCh <- nil
case <-ctx.Done():
resCh <- ""
errCh <- ctx.Err()
}
}()
return &InnerFuture{resCh: resCh, errCh: errCh}
}
usage
func main() {
ctx := context.Background()
future := SlowFunction(ctx)
res, err := future.Result()
if err != nil {
fmt.Println("error:", err)
return
}
}
Sharding
Participants:
- ShardedMap: an abstraction around one of more Shards providing read and write access as if the Shards were a single map.
- Shard: an individually-lockable collection representing a single data partition.
Implementation:
type Shard struct {
sync.RWMutex
m map[string]interface{}
}
type ShardedMap []*Shard
func NewSharedMap(nshards int) ShardedMap {
shards := make([]*Shard, nshards)
for i := 0; i < nshards; i++ {
shard := make(map[string]interface{})
shards[i] = &Shard{m: shard}
}
return shards
}
func (m ShardedMap) getShardIndex(key string) int {
checksum := sha1.Sum([]byte(key))
hash := int(checksum[17]) // pick a random byte as hash
index := hash % len(shards) // Mod by len(shards) to get index
}
func (m ShardedMap) getShard(key string) *Shard {
index := m.getShardIndex(key)
return m[index]
}
because it's using a byte-size value as the hash value, it can only handle up to 255 shards. If you want more than that, you can sprinkle some binary arithmetic on it: hash := int(sum[13]) << 8 | int(sump[17])
func (m ShardedMap) Get(key string) interface{} {
shard := m.getShard(key)
shard.RLock()
defer shard.RUnlock()
return shard.m[key]
}
func (m ShardedMap) Set(key string, value interface{}) {
shard := m.getShard(key)
shard.Lock()
defer shard.Unlock()
shard.m[key] = value
}
func (m ShardedMap) Keys() []string {
keys := make([]string, 0)
wg := sync.WaitGroup{}
wg.Add(len(m))
for _, shard := range m {
go func(s *Shard) {
s.RLock()
for key, _ := range s.m {
keys = append(keys, key)
}
s.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
return keys
}
usage
func main() {
shardedMap := NewSharedMap(5)
shardedMap.Set("alpha", 1)
shardedMap.Set("beta", 2)
shardedMap.Set("gamma", 3)
fmt.Println(shardedMap.Get("alpha"))
fmt.Println(shardedMap.Get("beta"))
fmt.Println(shardedMap.Get("gamma"))
keys := shardedMap.Keys()
for _, k := range keys {
fmt.Println(k)
}
}