Metronome's Cacophony (5/14) - concurrency in Go - WaitGroup
Asynchronous approach (ctd)
WaitGroup
Most asynchronous activities need a mechanism to signal completion with the option to deliver its outcome. We do this so that we can gather results of these Goroutines or just to to proceed once their effect sets, including graceful application termination scenario.
Note: I am saying most and not all, because we also have "fire and forget" model where we are not bothered about this signal, or the outcome, at least not in an immediate time-frame (I would put UDP here as an example).
One of Go's tools you can use to coordinate task execution is sync.WaitGroup
.
It is particularly good for signalling completion of one or more tasks.
I'm going to modify previous "shower" example so that instead of an arbitrary wait of 1 second it waits only as long as it needs.
1package main
2
3import (
4 "log"
5 "sync"
6)
7
8var groupActivities sync.WaitGroup
9
10func do(activity string) {
11 groupActivities.Add(1)
12 go func() {
13 log.Println(activity)
14 groupActivities.Done()
15 }()
16}
17
18func main() {
19 do("walk into bathroom")
20 do("undress")
21 do("take shower")
22 do("put pyjamas on")
23 do("say good night to your host")
24
25 groupActivities.Wait()
26
27 log.Println("program ended")
28}
Output:
12018/05/01 22:27:28 say good night to your host
22018/05/01 22:27:28 undress
32018/05/01 22:27:28 walk into bathroom
42018/05/01 22:27:28 put pyjamas on
52018/05/01 22:27:28 take shower
62018/05/01 22:27:28 program ended
7
8Process finished with exit code 0
We didn't have to guess when the tasks ended. We could take shower as long as needed, albeit in PJs.
This, quite typical scenario, creates a number of Goroutines within function do
.
Before each go
invocation (@line 12), we increment the WaitGroup
's internal task counter by 1.
At the end of Goroutine (but still within Goroutine!) we have to invoke WaitGroup.Done()
to decrement this counter.
@line 25 we want to know that all Goroutines completed, before we terminate the program.
Single invocation to WaitGroup.Wait()
will ensure that we won't progress past this point, until
all of them are finished.
WaitGroup.Wait()
blocks the execution until internal counter reaches 0.
Note: It is important to perform WaitGroup.Add()
before we start the Goroutine!
Otherwise, it may happen that the code gets to line with WaitGroup.Wait()
before the incrementation happen.
That means the internal counter will not represent the number of tasks and TaskGroup.Wait()
won't try to wait
for some of them.
It is also important to remember about the call to WaitGroup.Done()
as omitting it will cause the internal
counter to never reach 0, therefore call to WaitGroup.Wait()
will block infinitely causing deadlock.
WaitGroup.Done()
has to be placed at the very end (after any cleanup) and inside the Goroutine,
because that is the right moment to indicate tasks completion.
Placing immediately after the Goroutine will most likely prematurely decrement the counter, which means
WaitGroup.Wait()
will terminate and shutdown the lingering Goroutine abruptly.
Java analogy: If you are using Java 1.5+ you maybe lucky as few APIs help you achieve similar effect.
java.util.concurrent.Semaphore
maintains an internal count which can be incremented (.release()
), decreased (.acquire()
) and can wait for
all operations to complete (acquire(numberOfTasks)
). It seems reversed because the Semaphore conceptually maintains
a count of available permits rather than a count of busy Goroutines.
java.util.concurrent.CountDownLatch
is a non-resettable countdown. Both CountDownLatch nad Semaphore have to be initialised with a count up-front.
Also, consider
java.util.concurrent.CyclicBarrier
,
just keep in mind this needs to know the number of threads up-front.
Starting with Java 1.7+ there also is
java.util.concurrent.Phaser
which is the most flexible construct.
As you will see in the docs each of the above is way more complex than Go's WaitGroup.
Naive solution that knows when the work is done
We can now try using this fresh knowledge to slightly improve our last solution. We know already that we will need to do more rework but this small iteration may help visualise the effect.
1func(bpm param.Bpm, performer metronome.BeatPerformer) {
2
3 // declare WaitGroup (zero-value equivalent struct is fine)
4 taskGroup := sync.WaitGroup{}
5
6 // remember that deferred call will be executed upon exiting this function
7 // WaitGroup, wait for all tasks to invoke .Done()
8 defer taskGroup.Wait()
9
10 ticker := time.NewTicker(bpm.Interval())
11 defer ticker.Stop()
12
13 for beatCount := 0; beatCount < numberOfBeats; beatCount ++ {
14
15 // WaitGroup, make a note of additional task I want to wait for
16 taskGroup.Add(1)
17 go func(beatCount int) {
18 volume := volumeMeter()
19 performer(beatCount, volume)
20
21 // WaitGroup, this task is done, no need to wait for it
22 taskGroup.Done()
23 }(beatCount)
24
25 <-ticker.C
26 }
27}
Note: the above function uses defer twice. Just to clarify that deferred calls are placed on a stack.
Stacks are FILO structures.
It means upon this function exit, ticker.Stop()
will be called first, followed by taskGroup.Wait()
.
The code will produce similar trace: Interactive diagram
So it is still a mess, but at least we get to capture every single bit of it thanks to WaitGroup
.
Conclusion
I think it would be fair to state that this tiny, but malfunctioning and time-gobbling solution needs to end-up on in a bin.
I want to redo the whole structure of our metronome's engine by creating a separate, single Goroutine with life-span nearly equal to that of the application. That Goroutine would be continuously measuring volume and, once latest volume is known, it will update some shared variable. That shared variable could be accessed by the main timing loop to adjust the volume for beat performance.
Other posts in metronome-cacophony series
- Introduction
- Basic synchronous solution
- Ticker
- Goroutine
- WaitGroup
- Sharing state
- Atomicity
- Mutex
- Channels introduction
- Channel select
- Goroutines and channels
- Solution with channel
- Videos and final word
- Appendices