This is a post in the metronome-cacophony series.
Other posts in this series:

Metronome's Cacophony (11/14) - concurrency in Go - Goroutines and channels

Asynchronous approach (ctd)

Channels (ctd)

Channel and Goroutine

None of the previous examples demonstrated channels in the environment where they make most sense - concurrent one. My intention was to keep the examples as simple as possible and, at the same time, demonstrate the use of channels in synchronous way. What you have learned so far about channels will apply to Goroutines, so you didn't waste time by learning about it.

Let's do this!

 1package main
 3import "fmt"
 5func main() {
 6   words := make(chan string)
 8   go func() {
 9      words <- "hello"
10      words <- "world"
11      close(words)
12   }()
14   for word := range words {
15      fmt.Println(word)
16   }

This is a variation of one of the previous snippets. Notice the channel is unbuffered but this time, sending messages won't cause a deadlock. Here's what happens (simultaneously):

Main Goroutine
Create an unbuffered channel
Start Goroutine Attempt to send "hello" via a channel (potentially blocked at first)
Create for-range loop (receiver) Message "hello" sent
Received message "hello" Message "world" sent
Printed message "hello" Close channel
Received message "world" Goroutine ends
Printed message "world"
For-range loop ends as the channel is drained and closed
{: style="margin-bottom: 2em" }
4Process finished with exit code 0

Notice how convenient the for-range construct is. We don't have to use WaitGroup or Mutex synchronisation to wait for Goroutine to join the main thread of execution. for-range will block main execution until all messages are drained and channel closed. Once we leave the loop we can be sure the above Goroutine is no more.

Pipeline pattern for chaining multiple Goroutines with channels

This simple pattern emerged from my code as I was solving a concurrent problem. Only later I've learned that it could be described as a pipeline pattern.

Let's look at code that transforms words as they pass through chained Goroutines (deliverables of one Goroutine are fed into another).

Complete code on GitHub.

 1package main
 3import (
 4   "fmt"
 7// function reversing letters in input string and returning it
 8// for brevity, ASCII characters only (
 9func reverseLetters(input string) (output string) {
10   for i := len(input) - 1; i >= 0; i -- {
11      output += string(input[i])
12   }
13   return
16// function wrapping input string after wrapAtLength characters and
17// returning all wrapped segments as a slice of strings
18// for brevity, ASCII characters only (
19func wrapString(wrapAtLength int, input string) (output []string) {
20   i := 0
21   for ; i < len(input)-wrapAtLength; i += wrapAtLength {
22      output = append(output, input[i:i+wrapAtLength])
23   }
24   if i < len(input) - 1 {
25      output = append(output, input[i:])
26   }
27   return

Don't focus too much on the above. These are just convenience tools not related to concurrency at all. I have extracted them as separate functions because they perform distinct and generic purpose (albeit simplified, due to lack of support for Unicode). Thanks to that the Goroutines are easier to read, which is my goal.

 1// service reversing letters (ASCII only, oversimplifying it - use English alphabet)
 2func letterReversingService(inputStrings chan string) (outputStrings chan string) {
 3   outputStrings = make(chan string)
 4   go func() {
 5      for inputString := range inputStrings {
 6         outputStrings <- reverseLetters(inputString)
 7      }
 8      close(outputStrings)
 9   }()
10   return
13// service wrapping words at given length (ASCII only, oversimplifying it - use English alphabet)
14func textWrappingService(wrapAt int, inputStrings chan string) (outputStrings chan string) {
15   outputStrings = make(chan string)
16   go func() {
17      for inputString := range inputStrings {
18         wrapped := wrapString(wrapAt, inputString)
20         // notice that the following for-range loop iterates over an array, not channel
21         for _, segment := range wrapped {
22            outputStrings <- segment
23         }
24      }
25      close(outputStrings)
26   }()
27   return

In the above part, function letterReversingService creates a Goroutine which will invoke reverseLetters for each of the incoming strings. Similarly function textWrappingService will create a Goroutine which will invoke wrapString for each of the incoming strings. The difference between the two is that the latter service can produce more than one string at a time, should the string's length be larger than wrapAt argument.

Both functions can be classed as examples of channel pipeline design pattern.

 1var words = []string{
 2   "apple",
 3   "cucumber",
 4   "onion",
 5   "cabbage",
 6   "aubergine",
 9func main() {
10   textChan := make(chan string)
11   wrappedChan := textWrappingService(5, textChan)
12   reversedChan := letterReversingService(wrappedChan)
14   go func() {
15      for _, text := range words {
16         textChan <- text
17      }
18      close(textChan)
19   }()
21   for word := range reversedChan {
22      fmt.Println(word)
23   }

Function main creates three channels. The words fed into textChan will be transformed in textWrappingService and the result sent to wrappedChan. That channel is passed into letterReversingService, which transforms it further. Final result will be available on reversedChannel.

Notice that at line 13 the services with their Goroutines and channels are already created, but both Goroutines wait for incoming messages. Work does not start until we feed the words into textChan.

Line 14 shows Goroutine running anonymous function, which feeds the words. Goroutine is necessary as the channels are unbuffered and there is no receiver attached to the final channel.

Once we feed the words, the Goroutines will start working at the same time as we progress to for-range loop. Initially, it is likely there won't be any message ready yet. The loop will wait, then keep processing all messages. At the end of Goroutine that feeds the words we close the textChan. Because textWrappingService also uses for-range it means the textWrappingService will close its output channel when done with it. That, in turn, means that letterReversingService will close its output channel when that work completes.

Finally, in the main code, we wait for draining of reversedChan. After this loop we are certain we have processed all messages, there won't be any more sent (channel is closed and the Goroutines have finished cleanly.

What I particularly like about this example is that you can - at a glance - assess how the workers are composed. The lines from 10 to 12 show you the sequence of transformation that will be applied. You can see the parameters that each transformer is given (5 as wrapping length) and you can see the source of input as what goes on with deliverables (printing). To make it even better the feeding could be extracted as separate function to make it more declarative.

For completeness, here is the output:

10Process finished with exit code 0

I also like the ease with which you can compose different solution by just "rewiring" the input and output channels.

 1func main() {
 2   textChan := make(chan string)
 3   reversedChan := letterReversingService(textChan)
 4   wrappedChan := textWrappingService(5, reversedChan)
 6   go func() {
 7      for _, text := range words {
 8         textChan <- text
 9      }
10      close(textChan)
11   }()
13   for word := range wrappedChan {
14      fmt.Println(word)
15   }
10Process finished with exit code 0


Let's have another look at Go's mantra:

Do not communicate by sharing memory; instead, share memory by communicating.

How do I interpret this?

Instead of relying on shared, synchronised memory in your code to communicate between Goroutines, you should let Goroutines own a state. Then, you can implement a state change by communicating with the Goroutine.
When you communicate with that Goroutine it will change the state it owns accordingly. That relieves you from synchronising access as there is no such thing as shared state. The state can change owner.

How does it manifest itself in the above examples?

First of all the obvious, the parameter wrapAt is owned by the service it is applicable to. The service didn't have to reach some parameter repository to obtain it, or even worse, keep reaching it regularly. As such there was no need for access synchronisation and waiting.

Secondly, each of the messages, as they make their way through the system, are processed by a maximum of one Goroutine at a time. They are processed by a Goroutine that owns that message.

Note: If you like the notion of controlling state ownership explicitly maybe look at another programming language - Rust. Its semantics allows for transferring ownership and lending state.

As word "apple" goes through textWrappingService, letterReversingService waits in anticipation for some message to arrive. Once "apple" is delivered to letterReversingService, the textWrappingService has nothing to do with that word anymore. The channel cannot be rewound. Also, the "apple" that leaves the textWrappingService is a copy of original "apple". There is no mutation of state occurring here.

As mentioned in previous parts, immutability is making concurrent solution safer. It is a desirable property in concurrent systems.

We can now apply what we've learnt and rewrite the metronome's engine. I have to admit, I have neglected it for a bit.

This is a post in the metronome-cacophony series.
Other posts in this series: