Metronome's Cacophony (7/14) - concurrency in Go - Atomicity
Asynchronous approach (ctd)
Atomic
Go provides you with sync/atomic
package to help you with sharing state
correctly.
The tools within that package will help us provide implementation of metronome's engine with much more reliable
performance.
Let's start with generic definition of atomicity
atomicity
/ˌatəˈmɪsɪti/
noun
- [...]
- the state or fact of being composed of indivisible units.
-- by Google
In context of task execution, atomicity is a property that means task has no apparent effect until it completes in its entirety.
This property enables us to safely share state between asynchronous tasks (that includes single and multi-threaded applications).
It is widely used in programming and database systems (for details, look at systems with ACID properties).
Time for cooking analogy? You want to make a carbonated drink with homemade juice and no refined sugars and other chemicals that most off-the-shelf products contain. You have a siphon and CO2 cartridges. Your siphon is unloaded, you put new cartridge, press a button and voilà, a homemade, fizzy drink appears. I consider the task of piercing the cartridge as atomic because - in normal circumstances - it is either sealed, or pierced. You can change your mind and put the cartridge in a box and it is still perfectly usable.
If we look at longer tasks, it becomes much harder to find atomic equivalent. Due to specific nature of cooking we are altering physical and chemical properties of ingredients. Egg's proteins undergo gradual, heat induced denaturation during boiling, marinating makes the product gradually absorbs alien substance and softens fibres, grating cheese breaks the cheese block into pieces. In these cases we see a slow, incremental change of these properties and the ingredient is a little bit different at all stages. We can divide the tasks into stages like raw, soft- and hard-boiled egg or lightly or strongly marinated. Each of the stages has some effect, before the task is finished so these are not atomic.
I'll leave it like that.
Solution with atomic package
I've managed to squeeze few atomic
calls into the next solution:
- atomic.Load... - atomically load a value at an address
- atomic.Store... - atomically set a value at an address
- atomic.Swap... - sets a value at an address and returns the old value (the whole operation is atomic)
Given a variable accessed by multiple Goroutines, when the only means of access is via functions from atomic
package, ensures that only one Goroutine operates on this variable at any time. It also means that necessary memory
space is made visible between threads (if Goroutines even happen to be running on different threads).
Note:
If you wonder how you can influence the number of logical cores that your Go program will use, consider using
runtime.GOMAXPROCS()
and maybe pass runtime.NumCPU()
if it feels suitable for your case
I will split the solution into sections so that it is easier to explain it. In-code comments will hopefully make it clearer as well. If you prefer to look at a raw code go to BitBucket.
1
2// these constants help us determine the stage our program is in,
3// variable sharedStage will be set to one of them
4const (
5 // initially, we haven't received our first volume measurement, so we need to wait
6 StageNeedVolume int32 = iota
7
8 // once we measure volume for the first time, this will indicate we can progress to beat performing
9 StageFirstVolumeProvided
10
11 // indicate that we have performed enough beats and would like to finish gracefully
12 StageNeedTermination
13
14 // indicate that the goroutine measuring volume has terminated
15 StageTerminated
16)
Note: the usage of iota expression is not relevant for the example but just convenient to assign
consecutive numbers to a series of constants (kinda poor man's enum).
In the following example StageNeedsVolume
will be equal 0, StageFirstVolumeProvided
equal 1, and so on.
Read more in Go's docs
As I have mentioned before we need to synchronise efforts undertaken by our Goroutines.
It is not just the volume value that needs access synchronisation though.
For example, beat performing activity needs to know when first volume measurement has been delivered. Volume measurement activity needs to know when to finish. We also need to know when to safely terminate the application without abrupt termination of any Goroutine, as this may lead to memory leaks and inconsistencies as well.
Note: I could have achieved this with WaitGroup, mutex protected variables or channels (more about
the latter two later). I have used atomically synchronised variables to demonstrate the atomic
toolset.
1func createRealTimeVolumeVariable(
2 sharedStage *int32,
3 volumeMeter metronome.VolumeMeter) (*int32) {
4
5 sharedVolumeReading := int32(-1)
6
7 // continually and sequentially measure volume
8 go func() {
9 // safely read the sharedStage value and execute the for-loop body, until we need to terminate
10 for atomic.LoadInt32(sharedStage) != StageNeedTermination {
11
12 // safe volume is locally-scoped variable (not shared), which will temporarily hold volume value
13 safeVolume := int32(volumeMeter())
14
15 // atomically set the sharedVolumeReading to safeVolume and
16 // store old value of sharedVolumeReading in safeOldVolume
17 // (sharedVolumeReading is set to -1 before this loop)
18 safeOldVolume := atomic.SwapInt32(&sharedVolumeReading, safeVolume)
19
20 // on the first iteration of this loop the following will evaluate to true
21 if safeOldVolume == -1 {
22 // indicate we have measure volume for the first time, go to next stage
23 atomic.StoreInt32(sharedStage, StageFirstVolumeProvided)
24 }
25 }
26
27 // indicate this Goroutine is just about to be done
28 atomic.StoreInt32(sharedStage, StageTerminated)
29 }()
30
31 return &sharedVolumeReading
32}
Function createRealTimeVolumeVariable
will spawn new Goroutine which will constantly measure the volume and put
the latest value into sharedVolumeReading
variable.
A pointer to that variable is returned by createRealTimeVolumeVariable
upon return (without waiting for
Goroutine to complete).
Obviously, until we measure an actual volume it will contain -1 as that's the value we've initialised it with (line 5).
We see two separate examples of synchronised variables sharedVolumeReading
and sharedStage
.
I have used "shared" prefix which is not semantically significant, but serves as a reminder for me to be extra
vigilant when reading and writing to these variables.
Note: you have to be disciplined about atomic access and use it throughout. For a given shared variable, if you use it on all writes to it but omit it on reads you will still have occasions where read happens in the middle of a write which may cause the read value to come back as gibberish. That's why I like to include some reminder in the name. You should also know there are other mechanisms to exchange state in concurrent environment (e.g. read about optimistic concurrency control), but it can get tricky.
The actual measurement happens in line 13.
To store its result we could use atomic.StoreInt32()
function, but I have used atomic.SwapInt32()
instead.
The reason is I also want to obtain a previous value held in that shared variable.
That value will help me determine if the volume measurement is first or not.
If it is first, then we are entering stage StageFirstVolumeProvided
.
1func(bpm param.Bpm, performer metronome.BeatPerformer) {
2
3 // initial stage - we need first volume measurement before we start beating
4 sharedStage := StageNeedVolume
5
6 // this call will create a Goroutine that will continually provide the latest volume in a
7 // variable pointed to by sharedVolume
8 sharedVolume := createRealTimeVolumeVariable(&sharedStage, volumeMeter)
9
10 // this loop will wait for when the stage indicates we have first volume measurement
11 for atomic.LoadInt32(&sharedStage) != StageFirstVolumeProvided {
12 }
13
14 // standard Ticker
15 ticker := time.NewTicker(bpm.Interval())
16
17 // and deferred tear down of it
18 defer ticker.Stop()
19
20 for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {
21
22 // safely fetch the value of variable referenced by sharedVolume
23 // it will not block but retrieve whatever was the last value
24 safeVolume := int(atomic.LoadInt32(sharedVolume))
25
26 // standard beat performance
27 performer(beatCount, safeVolume)
28
29 // wait for next timing message
30 <-ticker.C
31 }
32
33 // indicate that we have performed enough beats and would like
34 // to terminate volume measurement. That will allow us to gracefully and
35 // in controlled manner terminate program.
36 atomic.StoreInt32(&sharedStage, StageNeedTermination)
37
38 // wait until volume measurement Goroutine is done
39 for atomic.LoadInt32(&sharedStage) != StageTerminated {
40 }
41}
In line 8, we are invoking createRealTimeVolumeVariable
so that the Goroutine measuring volume can start.
The latest volume will be accessible by dereferencing sharedValue
pointer with
a function from atomic
package.
You could technically dereference it directly (as in "nothing will stop you from doing harm"), for example:
1 volume := *sharedVolume
but that's not synchronised so the resulting value will be unreliable upon increased contention.
Instead we use (line 24)
1 safeVolume := int(atomic.LoadInt32(sharedVolume))
to obtain a copy of that value and store it in local (i.e. not shared) variable safeVolume
.
We can operate on safeVolume
without synchronisation because it is just a copy of a shared value,
not the shared value itself.
The rest of the code contains repeated usage of already discussed constructs so I won't go through it.
Time to see if this, much changed, metronome engine also produces cacophony of beats.
Way better!
Frequency looks regular, execution time looks ok, no ordering issue, all goroutines finished before termination.
Conclusion
Functions within atomic
package will allow you to synchronise access to shared state without major effort.
In the snippets I've limited myself to working with int32 type but there are few other signatures, for other types.
It looks like you can even synchronise struct
s by using atomic.Value
type along with
its methods .Load()
and .Store()
, but I haven't had chance to use it yet.
A bit more about atomic.Value
in this
blog
Java analogy: Java (1.5+) ships with its own atomic package.
I want to tell you that you can write an equivalent of atomic package yourself, with the use of mutexes (next chapter).
If you, however, find a need to synchronise access to a simple variable that is catered already by atomic
package it will be easier to use that package.
The advantage is that you don't need to write more code (which can have bugs), built-in options tend to be more
performant and - usually - are well designed and thoroughly tested, so it takes away quite a lot of potential pain.
While browsing, I came across many questions to the effect of "in Go, is function/method/package X/Y/Z safe for concurrent use?". The answer is that nothing is, unless explicitly confirmed in the docs.
Other posts in metronome-cacophony series
- Introduction
- Basic synchronous solution
- Ticker
- Goroutine
- WaitGroup
- Sharing state
- Atomicity
- Mutex
- Channels introduction
- Channel select
- Goroutines and channels
- Solution with channel
- Videos and final word
- Appendices