小编典典

如何使用频道广播消息

go

我是新手,我正在尝试创建一个简单的聊天服务器,客户端可以在其中向所有连接的客户端广播消息。

在我的服务器中,我有一个goroutine(无限循环),它接受连接,并且所有连接都由一个通道接收。

go func() {
    for {
        conn, _ := listener.Accept()
        ch <- conn
        }
}()

然后,我为每个连接的客户端启动一个处理程序(goroutine)。在处理程序内部,我尝试通过迭代通道来广播到所有连接。

for c := range ch {
    conn.Write(msg)
}

但是,我无法广播,因为(我认为通过阅读文档)通道需要在迭代之前关闭。我不确定何时应该关闭频道,因为我要不断接受新的连接,而关闭频道将不允许我这样做。如果有人可以帮助我,或者提供更好的方法向所有连接的客户端广播消息,将不胜感激。


阅读 356

收藏
2020-07-02

共1个答案

小编典典

您正在执行的是扇出模式,也就是说,多个端点正在侦听单个输入源。这种模式的结果是,只要输入源中有消息,这些侦听器中只有一个能够获取消息。唯一的例外是close渠道。close所有的听众都将认识到这一点,因此是“广播”。

但是您想要做的是广播从连接读取的消息,因此我们可以执行以下操作:

何时知道听众人数

让每个工作人员收听专用广播频道,并将消息从主频道分发到每个专用广播频道。

type worker struct {
    source chan interface{}
    quit chan struct{}
}

func (w *worker) Start() {
    w.source = make(chan interface{}, 10) // some buffer size to avoid blocking
    go func() {
        for {
            select {
            case msg := <-w.source
                // do something with msg
            case <-quit: // will explain this in the last section
                return
            }
        }
    }()
}

然后我们可能会有很多工人:

workers := []*worker{&worker{}, &worker{}}
for _, worker := range workers { worker.Start() }

然后启动我们的监听器:

go func() {
for {
    conn, _ := listener.Accept()
    ch <- conn
    }
}()

和调度员:

go func() {
    for {
        msg := <- ch
        for _, worker := workers {
            worker.source <- msg
        }
    }
}()

未知的听众人数

在这种情况下,上面给出的解决方案仍然有效。唯一的区别是,每当需要新工作人员时,都需要创建一个新工作人员,将其启动,然后将其推入workers切片。但是此方法需要一个线程安全的切片,该切片周围需要一个锁。一种实现可能如下所示:

type threadSafeSlice struct {
    sync.Mutex
    workers []*worker
}

func (slice *threadSafeSlice) Push(w *worker) {
    slice.Lock()
    defer slice.Unlock()

    workers = append(workers, w)
}

func (slice *threadSafeSlice) Iter(routine func(*worker)) {
    slice.Lock()
    defer slice.Unlock()

    for _, worker := range workers {
        routine(worker)
    }
}

每当您想开始工作时:

w := &worker{}
w.Start()
threadSafeSlice.Push(w)

您的调度员将更改为:

go func() {
    for {
        msg := <- ch
        threadSafeSlice.Iter(func(w *worker) { w.source <- msg })
    }
}()

遗言:永远不要离开悬空的goroutine

好的做法之一是:永远不要离开悬空的goroutine。因此,当您听完后,需要关闭所有触发的goroutine。这将通过以下quit渠道进行worker

首先,我们需要创建一个全局quit信令通道:

globalQuit := make(chan struct{})

每当我们创建一个worker时,我们都会为其分配globalQuit通道作为其退出信号:

worker.quit = globalQuit

然后,当我们要关闭所有工作程序时,我们只需执行以下操作:

close(globalQuit)

由于close所有侦听的goroutine都可以识别(这是您理解的重点),因此将返回所有goroutine。记住也要关闭调度程序,但我会留给您:)

2020-07-02