Jim Cheung

Chapter 4 - Cloud Native Patterns

The Context Package

context was introduced in Go 1.7, it contains a single interface, context.Context

type Context interface {
    // returns a channel that's closed when this context is cancelled
    Done() <-chan struct{}

    // indicates why this context was cancelled after the Done channel is closed.
    // If Done is not yet closed, Err returns nil.
    Err() error

    // returns the time when this context should be cancelled;
    // it returns ok == false if no dealine is set.
    Dealine() (dealine time.Time, ok bool)

    // returns the value associated with this context for key,
    // or nil if no value is associated with key. Use with care. (see WithValue)
    Value(key interface{}) interface{}
}

creating context

func Background() Context

empty context, never cancelled, has no value, and has no deadline

func context.TODO() Context

also empty context, but it's intended to be used as a placeholder when it's unclear which context to use or it is not yet available.

defining context deadlines and timeouts

func WithDeadline(Context, time.Time) (Context, CancelFunc)

accepts a specific time at which the context will be cancelled and Done will be closed.

func WithTimeout(Context, time.Duration) (Context, CancelFunc)

accepts a duration after which the context will be cancelled and Done will be closed.

func WithCancel(Context) (Context, CancelFunc)

accepts nothing, return a function that can be called to cancel the context

(when a context is cancelled, all contexts that are derived from it are also cancelled. contexts that it was derived from are not

defining request-scoped values

func WithValue(parent Context, key, val interface{}) Context

returns a derivation of parent in which key is associated with the value val

use with caution, check Context is for cancelation

using a context

func Stream(ctx context.Context, out chan<- Value) error {
    // create a derived context with a 10 seconds timeout and passes it to 
    // ServiceCall, it will be cancelled upon timeout, but ctx will not.
    dctx := context.WithTimeout(ctx, time.Second * 10)
    res, err := ServiceCall(dctx)
    if err != nil {
        return err
    }

    for {
        select {
            case out <- res:
            case <-ctx.Done():
                return ctx.Err()
        }
    }
}

Stability Patterns

Circuit Breaker

Participants:

Implementation:

type Circuit func(contxt.Context) (string, error)

func Breaker(circuit Circuit, failureThreshold uint64) Circuit {
    var lastStateSuccessful = true
    var consecutiveFailures uint64 = 0
    var lastAttampt time.Time = time.Now()

    return func(ctx context.Context) (string, error) {
        if consecutiveFailures >= failureThreshold {
            backoffLevel := consecutiveFailures - failureThreshold
            shouldRetryAt := lastAttampt.Add(time.Second * 2 << backoffLevel)

            if !time.Now().After(shouldRetryAt) {
                return "", errors.New("circuit open -- service unreachable")
            }
        }

        lastAttampt = time.Now()
        response, err := circuit(ctx)

        if err != nil {
            if !lastStateSuccessful {
                consecutiveFailures++
            }
            lastStateSuccessful = false
            return response, err
        }

        lastStateSuccessful = true
        consecutiveFailures = 0

        return response, nil
    }
}

Debounce

Participants:

Implementation:

type Circuit func(contxt.Context) (string, error)

func DebounceLast(circuit Circuit, d time.Duration) Circuit {
    var threshold time.Time = time.Now()
    var ticker *time.Ticker
    var result string
    var err error

    return func(ctx context.Context) (string, error) {
        threshold = time.Now().Add(d)

        if ticker == nil {
            ticker = time.NewTicker(time.Millisecond * 100)
            tickerc := ticker.C

            go func() {
                defer ticker.Stop()

                for {
                    select {
                        case <-tickerc:
                            if threshold.Before(time.Now()) {
                                result, err = circuit(ctx)
                                ticker.Stop()
                                ticker = nil
                                break
                            }
                        case <-ctx.Done():
                            result, err = "", ctx.Err()
                            break
                    }
                }
            }()
        }

        return result, err
    }
}

Retry

Participants:

Implementation:

type Effector func(context.Context) (string, error)

func Retry(effector Effector, retries int, delay time.Duration) Effector {
    return func(ctx context.Context) (string, error) {
        for r := 0; ; r++ {
            response, err := effector(ctx)
            if err == nil || r >= retries {
                return response, err
            }

            log.Printf("Attempt %d failed; retrying in %v", r + 1, delay)

            select {
                case <-time.After(delay):
                case <-ctx.Done():
                    return "", ctx.Err()
            }
        }
    }
}

to use this:

var count int

func EmulateTransientError(ctx context.Context) (string, error) {
    count++

    if count <= 3 {
        return "intenntial fail", error.New("error")
    } else {
        return "success", nil
    }
}

func main() {
    r := Retry(EmulateTransientError, 5, 2*time.Second)

    res, err := r(context.Background())

    fmt.Println(res, err)
}

Throttle

Participants:

Implementation:

type Effector func(context.Context) (string, error)

func Throttle(e Effector, max uint, refill uint, d time.Duration) Effector {
    var ticker *time.Ticker = nil
    var tokens uint = max

    var lastReturnString string
    var lastReturnError error

    return func(ctx context.Context) (string, error) {
        if ctx.Err() != nil {
            return "", ctx.Err()
        }

        if ticker == nil {
            ticker = time.NewTicker(d)
            defer ticker.Stop()

            go func() {
                for {
                    select {
                        case <-ticker.C:
                            t := tokens + refill
                            if t > max {
                                t = max
                            }
                            tokens = t
                        case <-ctx.Done():
                            ticker.Stop()
                            break
                    }
                }
            }()
        }

        if tokens > 0 {
            tokens-- 
            lastReturnString, lastReturnError = e(ctx)
        }

        return lastReturnString, lastReturnError
    }
}

Timeout

Participants:

Implementation:

In an ideal world:

ctx := context.Background()
ctxt, cancel := context.WithTimeout(ctx, 10 * time.Second)
defer cancel()

result, err := SomeFunction(ctxt)

but 3rd party function doesn't accept a Context value, create a Timeout function for it:

import {
    rando "gitplace.io/random-repo/random-package"
}

func Timeout(arg string) func(context.Context) (string, error) {
    chres := make(chan string)
    cherr := make(chan error)

    go func() {
        res, err := rando.SlowFunction(arg)
        chres <- res
        cheney <- err
    }() 

    return func(ctx context.Context) (string, error) {
        select {
            case res := <-chres:
                return res, <-check
            case <-ctx.Done():
                return "", ctx.Err()
        }
    }
}

usage

func main() {
    ctx := context.Background()
    ctxt, cancel := context.WithTimeout(ctx, 2 * time.Second)
    defer cancel()

    timeout := Timeout("some input value")
    res, err := timeout(ctxt)

    fmt.Println(res, err)
}

finally, although it's usually preferred to implement service timeouts using context.Context, channel timeouts can also be implemented using the time.After function.

Concurrency Patterns

Fan-In

Participants:

Implementation:

func Funnel(sources ...<-chan int) <-chan int {
    dest := make(chan int)

    var wg sync.WaitGroup

    wg.Add(len(sources))

    for _, ch := range sources {
        go func(c <-chan int) {
            defer wg.Done()

            for n := range c {
                dest <- n
            }
        }(ch)
    }

    // start a goroutine to close dest after all sources close
    go func() {
        wg.Wait()
        close(dest)
    }()

    return dest
}

usage

func main() {
    sources := make([]<-chan int, 0)

    for i := 0; i < 3; i++ {
        ch := make(chan int)
        sources = append(sources, ch)

        go func() {
            defer close(ch)

            for i := 1; i <= 5; i++ {
                ch <- i
                time.Sleep(time.Second)
            }
        }()
    }

    dest := Funnel(sources...)
    for d := range dest {
        fmt.Println(d)
    }

    fmt.Println("Done")
}

Fan-Out

Participants:

Implementation:

func Split(source <-chan int, n int) []<-chan int {
    dests := make([]<-chan int, 0)

    for i := 0; i < n; i++ {
        ch := make(chan int)
        dests = append(dests, ch)

        go func() {
            defer close(ch)

            for val := range source {
                ch <- val
            }
        }()
    }

    return dests
}

usage:

func main() {
    source := make(chan int)
    dests := Split(source, 5)

    go func() {
        for i := 1; i <= 10; i++ {
            source <- i
        }

        close(source)
    }()

    var wg sync.WaitGroup
    wg.Add(len(dests))

    for i, ch := range(dests) {
        go func(i int, d <-chan int) {
            defer wg.Done()

            for val := range d {
                fmt.Println("#%d got %d\n", i, val)
            }
        }(i, ch)
    }

    wg.Wait()
}

Future

Participants:

Implementation:

this pattern isn't used as frequently in Go as in some other languages because channels can be often used in a similar way. For example:

func InverseProduct(a, b Matrix) Matrix {
    inva := make(chan Matrix)
    invb := make(chan Matrix)

    go func() { inva <- Inverse(a) }()
    go func() { invb <- Inverse(b) }()

    return Product(<-inva, <-invb)
}

func Inverse(m Matrix) <-chan Matrix {
    out := make(chan Matrix)

    go func() {
        out <- BlockingInverse(m)
        close(out)
    }()
}
type Future interface {
    Result() (string, error)
}

type InnerFuture struct {
    once sync.Once
    wg   sync.WaitGroup

    res   string
    err   error
    resCh <-chan string
    errCh <-chan error
}

func (f *InnerFuture) Result() {string, error) {
    f.once.Do(func() {
        f.wg.Add(1)
        defer f.wg.Done()
        f.res = <-f.resCh
        f.err = <-f.errCh
    })

    f.wg.Wait()

    return f.res, f.err
}

func SlowFunction(ctx context.Context) Future {
    resCh := make(chan string)
    errCh := make(chan error)

    go func() {
        select {
            case <-time.After(time.Second * 2):
                resCh <- "I slept for 2 seconds"
                errCh <- nil
            case <-ctx.Done():
                resCh <- ""
                errCh <- ctx.Err()
        }
    }()

    return &InnerFuture{resCh: resCh, errCh: errCh}
}

usage

func main() {
    ctx := context.Background()
    future := SlowFunction(ctx)

    res, err := future.Result()
    if err != nil {
        fmt.Println("error:", err)
        return
    }
}

Sharding

Participants:

Implementation:

type Shard struct {
    sync.RWMutex
    m map[string]interface{}
}

type ShardedMap []*Shard

func NewSharedMap(nshards int) ShardedMap {
    shards := make([]*Shard, nshards)

    for i := 0; i < nshards; i++ {
        shard := make(map[string]interface{})
        shards[i] = &Shard{m: shard}
    }

    return shards
}

func (m ShardedMap) getShardIndex(key string) int {
    checksum := sha1.Sum([]byte(key))
    hash := int(checksum[17])    // pick a random byte as hash
    index := hash % len(shards)  // Mod by len(shards) to get index
}

func (m ShardedMap) getShard(key string) *Shard {
    index := m.getShardIndex(key)
    return m[index]
}

because it's using a byte-size value as the hash value, it can only handle up to 255 shards. If you want more than that, you can sprinkle some binary arithmetic on it: hash := int(sum[13]) << 8 | int(sump[17])

func (m ShardedMap) Get(key string) interface{} {
    shard := m.getShard(key)
    shard.RLock()
    defer shard.RUnlock()

    return shard.m[key]
}

func (m ShardedMap) Set(key string, value interface{}) {
    shard := m.getShard(key)
    shard.Lock()
    defer shard.Unlock()

    shard.m[key] = value
}

func (m ShardedMap) Keys() []string {
    keys := make([]string, 0)

    wg := sync.WaitGroup{}
    wg.Add(len(m))

    for _, shard := range m {
        go func(s *Shard) {
            s.RLock()

            for key, _ := range s.m {
                keys = append(keys, key)
            }

            s.RUnlock()
            wg.Done()
        }(shard)
    }
    
    wg.Wait()

    return keys
}

usage

func main() {
    shardedMap := NewSharedMap(5)

    shardedMap.Set("alpha", 1)
    shardedMap.Set("beta", 2)
    shardedMap.Set("gamma", 3)

    fmt.Println(shardedMap.Get("alpha"))
    fmt.Println(shardedMap.Get("beta"))
    fmt.Println(shardedMap.Get("gamma"))

    keys := shardedMap.Keys()
    for _, k := range keys {
        fmt.Println(k)
    }
}