The chan type in Go is a mechanism which can be used to send or receive data from one function to another. E.g:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func main() {
c := make(chan string)
go printHello(c)
sayHello(c)
}
func sayHello(msgChan chan<- string) {
msgChan <- "hello"
}
func printHello(msgChan <-chan string) {
fmt.Println(<-msgChan)
}
|
Channels can be bidirectional (chan string
) or directional (chan <- string
sender/<-chan string
receiver). And they operate like FIFO queues.
There’s also unbuffered (make(chan string)
) and buffered (make(chan string, 10)
) channels. The difference b/t the 2 is that buffered channels will not block on send until the buffer is full. E.g:
1
2
3
4
5
6
7
8
9
10
11
|
func main() {
c := make(chan string, 2)
// this won't block
c <- "hi"
c <- "there"
// this will block because the buffer size is 2
// if we'd change the size/capacity to 3, this send
// won't block either
c <- ":)"
}
|
Similarly, the receiver end won’t block until the buffer is empty:
1
2
3
4
5
6
|
func main() {
c := make(chan string, 2)
c <- "hi"
c <- "there"
fmt.Printf("%s %s", <-c, <-c)
}
|
Another important feature of channels is that if there are many receivers, only one of the receivers will get the message (so a one-to-one or many-to-one data flow). E.g:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func main() {
var wg sync.WaitGroup
c := make(chan string)
wg.Add(1)
go printHello(c, 1, &wg)
wg.Add(1)
go printHello(c, 2, &wg)
sayHello(c)
wg.Wait() // this will deadlock
}
func sayHello(msgChan chan<- string) {
msgChan <- "hello"
}
func printHello(msgChan <-chan string, id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println(fmt.Sprintf("%s from %d", <-msgChan, id))
}
|
This can be a useful feature, but sometimes you may need a one-to-many/many-to-many communication channel, and unfortunately, this is not available natively in Go as of 1.18.
So you will most likely end up using a pkg like RxGo or implement your own one-to-many/many-to-many interface.
I usually tend to use whatever pkgs I find that provide the features I’m interested in, but sometimes the implementation is complicated and difficult to debug. Or sometimes I just want to learn how it’s done, and I end up implementing functionality from scratch (not the wisest thing to do 😆 ).
So how would you go about implementing the aforementioned pattern? Well, it’s not that complicated. First, let’s start with an interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// NewPiper creates a piper interface that will
// pipe data from a source chan to many receivers.
//
// you could argue returning a struct,
// but I find using interfaces much more flexible,
// because we can create mocks for them using https://github.com/golang/mock.
func NewPiper(source <-chan Data) Piper {
return nil
}
type Piper interface {
Pipe() <-chan Data
}
// we're not using generics yet,
// but we want to abstract the data type for reusability
type Data struct {
V interface{}
}
|
So that looks pretty simple. But how about the implementation? That’s not complicated either.
There’s a few ways to go about it, and the one I find most easy to understand is using some sort of bookkeeping. What I mean by that is that we will need to return a new channel every time Pipe()
is called and then keep track of these channels so that we can send messages to each when the source channel sends.
So let’s create a struct that implements our interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func NewPiper(source <-chan Data) Piper {
return &piper{
chans: []chan<- Data{},
}
}
type piper struct {
chans []chan<- Data
}
func (p *piper) Pipe() <-chan Data {
c := make(chan Data)
p.chans = append(p.chans, c)
return c
}
|
Then let’s add some logic to read from the source and forward the messages to our channels:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func NewPiper(source <-chan Data) Piper {
p := &piper{
chans: []chan<- Data{},
}
go p.setupRcvLoop(source)
return p
}
func (p *piper) setupRcvLoop(source <-chan Data) {
for d := range source {
for _, c := range p.chans {
c <- d
}
}
}
|
We should probably also make sure we close the channels we create when the source is closed:
1
2
3
4
5
6
7
8
9
10
11
|
func (p *piper) setupRcvLoop(source <-chan Data) {
for d := range source {
for _, c := range p.chans {
c <- d
}
}
// source got closed, so close the receivers
for _, c := range p.chans {
close(c)
}
}
|
And maybe address concurrency concerns:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type piper struct {
mux sync.RWMutex
chans []chan<- Data
}
func (p *piper) Pipe() <-chan Data {
p.mux.Lock()
defer p.mux.Unlock()
c := make(chan Data)
p.chans = append(p.chans, c)
return c
}
func (p *piper) setupRcvLoop(source <-chan Data) {
for d := range source {
p.mux.RLock()
for _, c := range p.chans {
c <- d
}
p.mux.RUnlock()
}
// source got closed, so close the receivers
for _, c := range p.chans {
close(c)
}
}
|
So far, so good. Let’s test it:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func main() {
// the source
c := make(chan Data)
// the piping
piper := NewPiper(c)
p1 := piper.Pipe()
p2 := piper.Pipe()
// need to wait for msgs before we exit
var wg sync.WaitGroup
wg.Add(1)
go func(){
defer wg.Done()
fmt.Println(<-p1)
}()
wg.Add(1)
go func(){
defer wg.Done()
fmt.Println(<-p2)
}()
c <- Data{"hey"}
wg.Wait()
}
|
We should be seeing something like (check DG8eJacsSGl):
Great. It seems to work. But what if one of the receivers blocks because it’s doing some work? Let’s see what happens:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func main() {
// the source
c := make(chan Data)
// the piping
piper := NewPiper(c)
p1 := piper.Pipe()
p2 := piper.Pipe()
// need to wait for msgs before we exit
var wg sync.WaitGroup
wg.Add(1)
go func(){
defer wg.Done()
time.Sleep(time.Second)
fmt.Println(<-p1, time.Now())
}()
wg.Add(1)
go func(){
defer wg.Done()
fmt.Println(<-p2, time.Now())
}()
c <- Data{"hey"}
wg.Wait()
}
|
It still works, but it doesn’t look right (check gfjNV3dHfvU):
1
2
|
{hey} 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
{hey} 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
|
It seems like both pipes received the msg at the same time. That’s not exactly great 😞 . But that kind of makes sense. If we take a closer look at:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (p *piper) setupRcvLoop(source <-chan Data) {
for d := range source {
p.mux.RLock()
for _, c := range p.chans {
c <- d
}
p.mux.RUnlock()
}
// source got closed, so close the receivers
for _, c := range p.chans {
close(c)
}
}
|
We’re getting blocked by the first pipe because the send action blocks until there’s a receiver. So how do we fix this? Well, the simplest way to address this is to discard messages if there are no receivers - not ideal for every scenario, but it’ll do (see r3s8lJ_JvX1):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (p *piper) setupRcvLoop(source <-chan Data) {
for d := range source {
p.mux.RLock()
for _, c := range p.chans {
select {
case c <- d:
default:
}
}
p.mux.RUnlock()
}
// source got closed, so close the receivers
for _, c := range p.chans {
close(c)
}
}
|
Ok. That didn’t work either 🤯 ! We’re getting a deadlock. And this makes sense as well, because we add our receiver too late, and we missed the message. This is not easy to fix. I mean, there’s an easy fix, but it may not be ideal either.
We just need to use a buffered channel with size 1 when we create the receivers/pipes (see Ruqmk3QKQT1):
1
2
3
4
5
6
7
|
func (p *piper) Pipe() <-chan Data {
p.mux.Lock()
defer p.mux.Unlock()
c := make(chan Data, 1)
p.chans = append(p.chans, c)
return c
}
|
And now we get what we expect:
1
2
|
{hey} 2009-11-10 23:00:00 +0000 UTC m=+0.000000001
{hey} 2009-11-10 23:00:01 +0000 UTC m=+1.000000001
|
And that’s more or less how you can build a simple one-to-many/many-to-many data flow using chan
in Go. It’s obviously a lot more complex as you start thinking about different use cases or scenarios, but hopefully the above is good enough as a guide/baseline.
Do checkout the transcelestial/chanpiper pkg if the above is good enough for your use case.