Metronome's Cacophony (8/14) - concurrency in Go - Mutex

Asynchronous approach (ctd)

Mutex

Term stands for mutual exclusion. In programming it is a property limiting resource access to one thread (or even one consumer - due to lack of reentrancy, later about that) at a time. Sounds familiar? In previous section the atomic functions were helping us achieve just that in context of simple variables.

Atomic functions allow you to write, read or compare variables values in a mutually exclusive fashion. Mutexes can do more than that. You can make any section of code to be mutually exclusive. We do this by acquiring a lock before such section, and then releasing it once we are done. It is as if a guard was hired to fend off any other actors from our code as it is executing.

Consider an example without a mutex:

 1package main
 2
 3import (
 4   "log"
 5   "time"
 6   "sync"
 7)
 8
 9const (
10   bathroom = "***bathroom***"
11   kitchen  = "kitchen"
12)
13
14var (
15   visitorsGroup sync.WaitGroup
16)
17
18func visitRoom(person, room string) {
19   log.Println(person, "is entering", room)
20   time.Sleep(time.Second)
21   log.Println(person, "is leaving", room)
22   visitorsGroup.Done()
23}
24
25func main() {
26   visitorsGroup.Add(4)
27   go visitRoom("Dan", kitchen)
28   go visitRoom("Dave", bathroom)
29   go visitRoom("Elton", kitchen)
30   go visitRoom("Elvis", bathroom)
31
32   visitorsGroup.Wait()
33   log.Println("program ended")
34}

Output:

 12018/05/01 12:24:05 Elvis is entering ***bathroom***
 22018/05/01 12:24:05 Dan is entering kitchen
 32018/05/01 12:24:05 Dave is entering ***bathroom***
 42018/05/01 12:24:05 Elton is entering kitchen
 52018/05/01 12:24:06 Dan is leaving kitchen
 62018/05/01 12:24:06 Elvis is leaving ***bathroom***
 72018/05/01 12:24:06 Dave is leaving ***bathroom***
 82018/05/01 12:24:06 Elton is leaving kitchen
 92018/05/01 12:24:06 program ended
10
11Process finished with exit code 0

The output shows that Dan and Elton entered the kitchen while Elvis and Dave entered the bathroom simultaneously. While most kitchens can accommodate more than one person at the same time, the nature of a bathroom (with certain presumptions) would imply mutually exclusive property. The resources in this case are the rooms, and the threads are the people using them.

Let's introduce mutex.

 1package main
 2
 3import (
 4   "log"
 5   "time"
 6   "sync"
 7)
 8
 9const (
10   bathroom = "***bathroom***"
11   kitchen  = "kitchen"
12)
13
14var (
15   visitorsGroup sync.WaitGroup
16   bathroomMutex sync.Mutex
17)
18
19func visitRoom(person, room string) {
20   if room == bathroom {
21      bathroomMutex.Lock()
22      
23      //defer will be executed when function leaves, not the local block
24      defer bathroomMutex.Unlock()
25   }
26   log.Println(person, "is entering", room)
27   time.Sleep(time.Second)
28   log.Println(person, "is leaving", room)
29   visitorsGroup.Done()
30}
31
32func main() {
33   visitorsGroup.Add(4)
34   go visitRoom("Dan", kitchen)
35   go visitRoom("Dave", bathroom)
36   go visitRoom("Elton", kitchen)
37   go visitRoom("Elvis", bathroom)
38
39   visitorsGroup.Wait()
40   log.Println("program ended")
41}
 12018/05/01 12:33:41 Elvis is entering ***bathroom***
 22018/05/01 12:33:41 Dan is entering kitchen
 32018/05/01 12:33:41 Elton is entering kitchen
 42018/05/01 12:33:42 Elton is leaving kitchen
 52018/05/01 12:33:42 Dan is leaving kitchen
 62018/05/01 12:33:42 Elvis is leaving ***bathroom***
 72018/05/01 12:33:42 Dave is entering ***bathroom***
 82018/05/01 12:33:43 Dave is leaving ***bathroom***
 92018/05/01 12:33:43 program ended
10
11Process finished with exit code 0

Now the bathroom will be used by one person at a time, while kitchen is shared between 3 people. Once Elvis is no more in the (bath)room, Dave enters.

It is a powerful, but dangerous construct.

It is worth noting that all operations that require synchronised access should be as quick as possible, so that the locked shared resource can be released for others. People with several kids and one bathroom will know very well how important it is. Do whatever requires privacy while locked inside, brush your hair outside.

In programming, enclosing too long operations as mutex will result in performance stutters in your application. We describe it as a situation with high resource or thread contention.

Also, be extra vigilant when using other mutex synchronised operations from within mutex synchronised operations. By doing so you you may summon deadlocks, which will make your application grind to a halt. That situation occurs when two tasks wait for each other to finish, but none of them can, waiting forever. Another piece of advise is to release your locks in the reverse order to the one you acquired them in.

Java analogy: Mutex is just another term for a lock. Java comes with a variety of locks, have a look at java.util.concurrent.locks package. Be careful because Go's locks are not reentrant. If you are really missing reentrant locks, have a look at this discussion to learn why reentrant locking can defeat the very purpose of locking in the first place.

Solution with Mutex

Again, due to the size of this solution I will split it functionally

 1//because this struct and demo code resides in the same package it may be tempting
 2// to access struct's field directly - don't! unless you synchronise access to them
 3type SharedState struct {
 4   sync.Mutex
 5   volume               int
 6   firstMeasurementDone bool
 7   terminationRequested bool
 8   measuringFinished    bool
 9}
10
11func NewSharedState() *SharedState {
12   return &SharedState{
13      volume: -1,
14   }
15}
16
17// convenience method reducing repetition around locking/unlocking from 2 to 1 line
18func (s *SharedState) lockNow() *sync.Mutex {
19   s.Lock()
20   return &s.Mutex
21}
22
23func (s *SharedState) KeepMeasuring() bool {
24   defer s.lockNow().Unlock() // .lockNow() invoked straightaway, .Unlock() will defer
25   return !s.terminationRequested
26}
27
28func (s *SharedState) RequestTermination() {
29   defer s.lockNow().Unlock()
30   s.terminationRequested = true
31}
32
33func (s *SharedState) VolumeMeasuredAtLeastOnce() bool {
34   defer s.lockNow().Unlock()
35   return s.volume != -1
36}
37
38func (s *SharedState) NewVolumeMeasurement(volume int) {
39   defer s.lockNow().Unlock()
40   s.volume = volume
41}
42
43func (s *SharedState) MeasuringFinished() {
44   defer s.lockNow().Unlock()
45   s.measuringFinished = true
46}
47
48func (s *SharedState) HasMeasuringFinished() bool {
49   defer s.lockNow().Unlock()
50   return s.measuringFinished
51}
52
53func (s *SharedState) LatestVolume() int {
54   defer s.lockNow().Unlock()
55   return s.volume
56}

SharedState is a struct which holds all state shared between our Goroutines. Its methods are responsible for synchronisation of access to all shared state.

It gives an important advantage that the state just cannot be accessed in any other way. When multiple people are working on the same code and some are unfamiliar with the inner workings, this reduces the risk of direct state access without synchronisation. Obviously, for this concept to work the struct would have to have unexported fields 👍 and sit in a separate package (not in this example 👎).

Also, encapsulating this responsibility in one place means that our timing loop (below) reads better and we do not throw another responsibility in the mix.

 1func(bpm param.Bpm, performer metronome.BeatPerformer) {
 2
 3   sharedState := NewSharedState()
 4   
 5   // continually and sequentially measure volume
 6   go func() {
 7      for sharedState.KeepMeasuring() {
 8         sharedState.NewVolumeMeasurement(volumeMeter())
 9      }
10      sharedState.MeasuringFinished()
11   }()
12 
13   // wait for first measurement to come through
14   for !sharedState.VolumeMeasuredAtLeastOnce() {
15   }
16 
17   ticker := time.NewTicker(bpm.Interval())
18   defer ticker.Stop()
19 
20   for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {
21      performer(beatCount, sharedState.LatestVolume())
22      <-ticker.C
23   }
24 
25   sharedState.RequestTermination()
26 
27   for !sharedState.HasMeasuringFinished() {
28   }
29}

Note: It is fairly literal translation of previous implementation with atomic package. I could have used separate mutexes to block execution before the first volume measurement arrives and another, to wait for measuring Goroutine termination.

Let's have a look at the execution diagram:

Interactive diagram

Conclusion

Mutexes give you fine-level control over how to synchronise shared access. While writing Java, I often treated mutex as a low-level building block for more sophisticated synchronisation mechanisms.

Initially, while learning Go, I thought I will transfer my mutex experience. As the learning progressed I have understood that this is not the first port of call for building concurrent applications in Go.

Channels combined with Goroutines are the preferred option. It isn't however a drop-in replacement for any of the other concurrency tools that I have shown you so far. It requires a fresh look at your problem and - sometimes - significantly different architecture.


Other posts in metronome-cacophony series