This is a post in the metronome-cacophony series.
Other posts in this series:

Metronome's Cacophony (7/14) - concurrency in Go - Atomicity

Asynchronous approach (ctd)

Atomic

Go provides you with sync/atomic package to help you with sharing state correctly. The tools within that package will help us provide implementation of metronome's engine with much more reliable performance.

Let's start with generic definition of atomicity

atomicity

/ˌatəˈmɪsɪti/

noun

  1. [...]
  2. the state or fact of being composed of indivisible units.

-- by Google

In context of task execution, atomicity is a property that means task has no apparent effect until it completes in its entirety.

This property enables us to safely share state between asynchronous tasks (that includes single and multi-threaded applications).

It is widely used in programming and database systems (for details, look at systems with ACID properties).

Time for cooking analogy? You want to make a carbonated drink with homemade juice and no refined sugars and other chemicals that most off-the-shelf products contain. You have a siphon and CO2 cartridges. Your siphon is unloaded, you put new cartridge, press a button and voilà, a homemade, fizzy drink appears. I consider the task of piercing the cartridge as atomic because - in normal circumstances - it is either sealed, or pierced. You can change your mind and put the cartridge in a box and it is still perfectly usable.

If we look at longer tasks, it becomes much harder to find atomic equivalent. Due to specific nature of cooking we are altering physical and chemical properties of ingredients. Egg's proteins undergo gradual, heat induced denaturation during boiling, marinating makes the product gradually absorbs alien substance and softens fibres, grating cheese breaks the cheese block into pieces. In these cases we see a slow, incremental change of these properties and the ingredient is a little bit different at all stages. We can divide the tasks into stages like raw, soft- and hard-boiled egg or lightly or strongly marinated. Each of the stages has some effect, before the task is finished so these are not atomic.

I'll leave it like that.

Solution with atomic package

I've managed to squeeze few atomic calls into the next solution:

  • atomic.Load... - atomically load a value at an address
  • atomic.Store... - atomically set a value at an address
  • atomic.Swap... - sets a value at an address and returns the old value (the whole operation is atomic)

Given a variable accessed by multiple Goroutines, when the only means of access is via functions from atomic package, ensures that only one Goroutine operates on this variable at any time. It also means that necessary memory space is made visible between threads (if Goroutines even happen to be running on different threads).

Note: If you wonder how you can influence the number of logical cores that your Go program will use, consider using runtime.GOMAXPROCS() and maybe pass runtime.NumCPU() if it feels suitable for your case

I will split the solution into sections so that it is easier to explain it. In-code comments will hopefully make it clearer as well. If you prefer to look at a raw code go to GitHub.

 1
 2// these constants help us determine the stage our program is in, 
 3// variable sharedStage will be set to one of them
 4const (
 5   // initially, we haven't received our first volume measurement, so we need to wait
 6   StageNeedVolume      int32 = iota
 7
 8   // once we measure volume for the first time, this will indicate we can progress to beat performing
 9   StageFirstVolumeProvided
10
11   // indicate that we have performed enough beats and would like to finish gracefully
12   StageNeedTermination
13
14   // indicate that the goroutine measuring volume has terminated
15   StageTerminated
16)

Note: the usage of iota expression is not relevant for the example but just convenient to assign consecutive numbers to a series of constants (kinda poor man's enum). In the following example StageNeedsVolume will be equal 0, StageFirstVolumeProvided equal 1, and so on. Read more in Go's docs

As I have mentioned before we need to synchronise efforts undertaken by our Goroutines.

It is not just the volume value that needs access synchronisation though.

For example, beat performing activity needs to know when first volume measurement has been delivered. Volume measurement activity needs to know when to finish. We also need to know when to safely terminate the application without abrupt termination of any Goroutine, as this may lead to memory leaks and inconsistencies as well.

Note: I could have achieved this with WaitGroup, mutex protected variables or channels (more about the latter two later). I have used atomically synchronised variables to demonstrate the atomic toolset.

 1func createRealTimeVolumeVariable(
 2   sharedStage *int32,
 3   volumeMeter metronome.VolumeMeter) (*int32) {
 4
 5   sharedVolumeReading := int32(-1)
 6
 7   // continually and sequentially measure volume
 8   go func() {
 9      // safely read the sharedStage value and execute the for-loop body, until we need to terminate
10      for atomic.LoadInt32(sharedStage) != StageNeedTermination {
11
12         // safe volume is locally-scoped variable (not shared), which will temporarily hold volume value
13         safeVolume := int32(volumeMeter())
14
15         // atomically set the sharedVolumeReading to safeVolume and
16         // store old value of sharedVolumeReading in safeOldVolume
17         // (sharedVolumeReading is set to -1 before this loop)
18         safeOldVolume := atomic.SwapInt32(&sharedVolumeReading, safeVolume)
19
20         // on the first iteration of this loop the following will evaluate to true
21         if safeOldVolume == -1 {
22            // indicate we have measure volume for the first time, go to next stage
23            atomic.StoreInt32(sharedStage, StageFirstVolumeProvided)
24         }
25      }
26
27      // indicate this Goroutine is just about to be done
28      atomic.StoreInt32(sharedStage, StageTerminated)
29   }()
30
31   return &sharedVolumeReading
32}

Function createRealTimeVolumeVariable will spawn new Goroutine which will constantly measure the volume and put the latest value into sharedVolumeReading variable. A pointer to that variable is returned by createRealTimeVolumeVariable upon return (without waiting for Goroutine to complete). Obviously, until we measure an actual volume it will contain -1 as that's the value we've initialised it with (line 5). We see two separate examples of synchronised variables sharedVolumeReading and sharedStage. I have used "shared" prefix which is not semantically significant, but serves as a reminder for me to be extra vigilant when reading and writing to these variables.

Note: you have to be disciplined about atomic access and use it throughout. For a given shared variable, if you use it on all writes to it but omit it on reads you will still have occasions where read happens in the middle of a write which may cause the read value to come back as gibberish. That's why I like to include some reminder in the name. You should also know there are other mechanisms to exchange state in concurrent environment (e.g. read about optimistic concurrency control), but it can get tricky.

The actual measurement happens in line 13. To store its result we could use atomic.StoreInt32() function, but I have used atomic.SwapInt32() instead. The reason is I also want to obtain a previous value held in that shared variable. That value will help me determine if the volume measurement is first or not. If it is first, then we are entering stage StageFirstVolumeProvided.

 1func(bpm param.Bpm, performer metronome.BeatPerformer) {
 2
 3   // initial stage - we need first volume measurement before we start beating
 4   sharedStage := StageNeedVolume
 5
 6   // this call will create a Goroutine that will continually provide the latest volume in a
 7   // variable pointed to by sharedVolume
 8   sharedVolume := createRealTimeVolumeVariable(&sharedStage, volumeMeter)
 9
10   // this loop will wait for when the stage indicates we have first volume measurement
11   for atomic.LoadInt32(&sharedStage) != StageFirstVolumeProvided {
12   }
13
14   // standard Ticker
15   ticker := time.NewTicker(bpm.Interval())
16   
17   // and deferred tear down of it
18   defer ticker.Stop()
19
20   for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {
21
22      // safely fetch the value of variable referenced by sharedVolume
23      // it will not block but retrieve whatever was the last value
24      safeVolume := int(atomic.LoadInt32(sharedVolume))
25
26      // standard beat performance
27      performer(beatCount, safeVolume)
28
29      // wait for next timing message
30      <-ticker.C
31   }
32
33   // indicate that we have performed enough beats and would like
34   // to terminate volume measurement. That will allow us to gracefully and
35   // in controlled manner terminate program.
36   atomic.StoreInt32(&sharedStage, StageNeedTermination)
37
38   // wait until volume measurement Goroutine is done
39   for atomic.LoadInt32(&sharedStage) != StageTerminated {
40   }
41}

In line 8, we are invoking createRealTimeVolumeVariable so that the Goroutine measuring volume can start. The latest volume will be accessible by dereferencing sharedValue pointer with a function from atomic package. You could technically dereference it directly (as in "nothing will stop you from doing harm"), for example:

1   volume := *sharedVolume

but that's not synchronised so the resulting value will be unreliable upon increased contention.

Instead we use (line 24)

1   safeVolume := int(atomic.LoadInt32(sharedVolume))

to obtain a copy of that value and store it in local (i.e. not shared) variable safeVolume. We can operate on safeVolume without synchronisation because it is just a copy of a shared value, not the shared value itself.

The rest of the code contains repeated usage of already discussed constructs so I won't go through it.

Time to see if this, much changed, metronome engine also produces cacophony of beats.

Asynchronous - atomic operations as means to synchronise access to shared state
Interactive diagram

Way better!

Frequency looks regular, execution time looks ok, no ordering issue, all goroutines finished before termination.

Conclusion

Functions within atomic package will allow you to synchronise access to shared state without major effort. In the snippets I've limited myself to working with int32 type but there are few other signatures, for other types. It looks like you can even synchronise structs by using atomic.Value type along with its methods .Load() and .Store(), but I haven't had chance to use it yet. A bit more about atomic.Value in this blog

Java analogy: Java (1.5+) ships with its own atomic package.

I want to tell you that you can write an equivalent of atomic package yourself, with the use of mutexes (next chapter). If you, however, find a need to synchronise access to a simple variable that is catered already by atomic package it will be easier to use that package. The advantage is that you don't need to write more code (which can have bugs), built-in options tend to be more performant and - usually - are well designed and thoroughly tested, so it takes away quite a lot of potential pain.

While browsing, I came across many questions to the effect of "in Go, is function/method/package X/Y/Z safe for concurrent use?". The answer is that nothing is, unless explicitly confirmed in the docs.


This is a post in the metronome-cacophony series.
Other posts in this series: