This is a post in the
metronome-cacophony series.
Other posts in this series:
- May 01, 2018 - Introduction
- May 01, 2018 - Basic synchronous solution
- May 01, 2018 - Ticker
- May 01, 2018 - Goroutine
- May 01, 2018 - WaitGroup
- May 01, 2018 - Sharing state
- May 01, 2018 - Atomicity
- May 01, 2018 - Mutex
- May 01, 2018 - Channels introduction
- May 01, 2018 - Channel select
- May 01, 2018 - Goroutines and channels
- May 01, 2018 - Solution with channel
- May 01, 2018 - Videos and final word
- May 01, 2018 - Appendices
Metronome's Cacophony (11/14) - concurrency in Go - Goroutines and channels
Asynchronous approach (ctd)
Channels (ctd)
Channel and Goroutine
None of the previous examples demonstrated channels in the environment where they make most sense - concurrent one. My intention was to keep the examples as simple as possible and, at the same time, demonstrate the use of channels in synchronous way. What you have learned so far about channels will apply to Goroutines, so you didn't waste time by learning about it.
Let's do this!
1package main
2
3import "fmt"
4
5func main() {
6 words := make(chan string)
7
8 go func() {
9 words <- "hello"
10 words <- "world"
11 close(words)
12 }()
13
14 for word := range words {
15 fmt.Println(word)
16 }
17}
This is a variation of one of the previous snippets. Notice the channel is unbuffered but this time, sending messages won't cause a deadlock. Here's what happens (simultaneously):
Main | Goroutine |
---|---|
Create an unbuffered channel | |
Start Goroutine | Attempt to send "hello" via a channel (potentially blocked at first) |
Create for-range loop (receiver) | Message "hello" sent |
Received message "hello" | Message "world" sent |
Printed message "hello" | Close channel |
Received message "world" | Goroutine ends |
Printed message "world" | |
For-range loop ends as the channel is drained and closed | |
{: style="margin-bottom: 2em" } |
1hello
2world
3
4Process finished with exit code 0
Notice how convenient the for-range
construct is.
We don't have to use WaitGroup
or Mutex
synchronisation to wait for Goroutine to join the main thread of execution.
for-range
will block main execution until all messages are drained and channel closed.
Once we leave the loop we can be sure the above Goroutine is no more.
Pipeline pattern for chaining multiple Goroutines with channels
This simple pattern emerged from my code as I was solving a concurrent problem. Only later I've learned that it could be described as a pipeline pattern.
Let's look at code that transforms words as they pass through chained Goroutines (deliverables of one Goroutine are fed into another).
Complete code on GitHub.
1package main
2
3import (
4 "fmt"
5)
6
7// function reversing letters in input string and returning it
8// for brevity, ASCII characters only (https://en.wikipedia.org/wiki/ASCII)
9func reverseLetters(input string) (output string) {
10 for i := len(input) - 1; i >= 0; i -- {
11 output += string(input[i])
12 }
13 return
14}
15
16// function wrapping input string after wrapAtLength characters and
17// returning all wrapped segments as a slice of strings
18// for brevity, ASCII characters only (https://en.wikipedia.org/wiki/ASCII)
19func wrapString(wrapAtLength int, input string) (output []string) {
20 i := 0
21 for ; i < len(input)-wrapAtLength; i += wrapAtLength {
22 output = append(output, input[i:i+wrapAtLength])
23 }
24 if i < len(input) - 1 {
25 output = append(output, input[i:])
26 }
27 return
28}
Don't focus too much on the above. These are just convenience tools not related to concurrency at all. I have extracted them as separate functions because they perform distinct and generic purpose (albeit simplified, due to lack of support for Unicode). Thanks to that the Goroutines are easier to read, which is my goal.
1// service reversing letters (ASCII only, oversimplifying it - use English alphabet)
2func letterReversingService(inputStrings chan string) (outputStrings chan string) {
3 outputStrings = make(chan string)
4 go func() {
5 for inputString := range inputStrings {
6 outputStrings <- reverseLetters(inputString)
7 }
8 close(outputStrings)
9 }()
10 return
11}
12
13// service wrapping words at given length (ASCII only, oversimplifying it - use English alphabet)
14func textWrappingService(wrapAt int, inputStrings chan string) (outputStrings chan string) {
15 outputStrings = make(chan string)
16 go func() {
17 for inputString := range inputStrings {
18 wrapped := wrapString(wrapAt, inputString)
19
20 // notice that the following for-range loop iterates over an array, not channel
21 for _, segment := range wrapped {
22 outputStrings <- segment
23 }
24 }
25 close(outputStrings)
26 }()
27 return
28}
In the above part, function letterReversingService
creates a Goroutine which will invoke reverseLetters
for each of the incoming strings.
Similarly function textWrappingService
will create a Goroutine which will invoke wrapString
for each of
the incoming strings.
The difference between the two is that the latter service can produce more than one string at a time,
should the string's length be larger than wrapAt
argument.
Both functions can be classed as examples of channel pipeline design pattern.
1var words = []string{
2 "apple",
3 "cucumber",
4 "onion",
5 "cabbage",
6 "aubergine",
7}
8
9func main() {
10 textChan := make(chan string)
11 wrappedChan := textWrappingService(5, textChan)
12 reversedChan := letterReversingService(wrappedChan)
13
14 go func() {
15 for _, text := range words {
16 textChan <- text
17 }
18 close(textChan)
19 }()
20
21 for word := range reversedChan {
22 fmt.Println(word)
23 }
24
25}
26
Function main
creates three channels. The words fed into textChan
will be transformed in textWrappingService
and the result sent to wrappedChan
. That channel is passed into letterReversingService
, which transforms it
further. Final result will be available on reversedChannel
.
Notice that at line 13 the services with their Goroutines and channels are already created,
but both Goroutines wait for incoming messages.
Work does not start until we feed the words into textChan
.
Line 14 shows Goroutine running anonymous function, which feeds the words. Goroutine is necessary as the channels are unbuffered and there is no receiver attached to the final channel.
Once we feed the words, the Goroutines will start working at the same time as we progress to for-range
loop.
Initially, it is likely there won't be any message ready yet.
The loop will wait, then keep processing all messages.
At the end of Goroutine that feeds the words we close
the textChan
.
Because textWrappingService
also uses for-range
it means the textWrappingService
will close
its output
channel when done with it.
That, in turn, means that letterReversingService
will close its output channel when that work completes.
Finally, in the main code, we wait for draining of reversedChan
.
After this loop we are certain we have processed all messages, there won't be any more sent (channel is closed and
the Goroutines have finished cleanly.
What I particularly like about this example is that you can - at a glance - assess how the workers are composed.
The lines from 10 to 12 show you the sequence of transformation that will be applied.
You can see the parameters that each transformer is given (5
as wrapping length) and you can see the source of
input as what goes on with deliverables (printing).
To make it even better the feeding could be extracted as separate function to make it more declarative.
For completeness, here is the output:
1elppa
2mucuc
3reb
4noino
5abbac
6eg
7rebua
8enig
9
10Process finished with exit code 0
I also like the ease with which you can compose different solution by just "rewiring" the input and output channels.
1func main() {
2 textChan := make(chan string)
3 reversedChan := letterReversingService(textChan)
4 wrappedChan := textWrappingService(5, reversedChan)
5
6 go func() {
7 for _, text := range words {
8 textChan <- text
9 }
10 close(textChan)
11 }()
12
13 for word := range wrappedChan {
14 fmt.Println(word)
15 }
16}
1elppa
2rebmu
3cuc
4noino
5egabb
6ac
7enigr
8ebua
9
10Process finished with exit code 0
Conclusion
Let's have another look at Go's mantra:
Do not communicate by sharing memory; instead, share memory by communicating.
How do I interpret this?
Instead of relying on shared, synchronised memory in your code to communicate between Goroutines,
you should let Goroutines own a state.
Then, you can implement a state change by communicating with the Goroutine.
When you communicate with that Goroutine it will change the state it owns accordingly.
That relieves you from synchronising access as there is no such thing as shared state.
The state can change owner.
How does it manifest itself in the above examples?
First of all the obvious, the parameter wrapAt
is owned by the service it is applicable to.
The service didn't have to reach some parameter repository to obtain it, or even worse, keep reaching it regularly.
As such there was no need for access synchronisation and waiting.
Secondly, each of the messages, as they make their way through the system, are processed by a maximum of one Goroutine at a time. They are processed by a Goroutine that owns that message.
Note: If you like the notion of controlling state ownership explicitly maybe look at another programming language - Rust. Its semantics allows for transferring ownership and lending state.
As word "apple" goes through textWrappingService
, letterReversingService
waits in anticipation for some message
to arrive.
Once "apple" is delivered to letterReversingService
, the textWrappingService
has nothing to do with that
word anymore.
The channel cannot be rewound.
Also, the "apple" that leaves the textWrappingService
is a copy of original "apple".
There is no mutation of state occurring here.
As mentioned in previous parts, immutability is making concurrent solution safer. It is a desirable property in concurrent systems.
We can now apply what we've learnt and rewrite the metronome's engine. I have to admit, I have neglected it for a bit.
This is a post in the
metronome-cacophony series.
Other posts in this series:
- May 01, 2018 - Introduction
- May 01, 2018 - Basic synchronous solution
- May 01, 2018 - Ticker
- May 01, 2018 - Goroutine
- May 01, 2018 - WaitGroup
- May 01, 2018 - Sharing state
- May 01, 2018 - Atomicity
- May 01, 2018 - Mutex
- May 01, 2018 - Channels introduction
- May 01, 2018 - Channel select
- May 01, 2018 - Goroutines and channels
- May 01, 2018 - Solution with channel
- May 01, 2018 - Videos and final word
- May 01, 2018 - Appendices