本篇博客讲述了使用Golang内置库搭建聊天室的方法。

1. 如何去构建一个chat server

chat server要做的事情就是接收用户发来的信息,并且将这些信息广播给所有在聊天室中的用户。

  • 服务端处于空闲状态时,会一直监听给定的端口

  • 当用户发来一个连接请求时,服务端会启动一个新的goroutine去处理这个请求,处理的内容包括读取用户输入的信息和向用户发送信息

但是问题来了,如何使得一个用户发送的信息对聊天室所有的成员可见呢?

  • 设置一个map,里面记录了所有的用户连接信息
  • 设置一个broadcaster,使用select语句处理所有以下所有的channel,并且根据从channel中获取的信息,来对map进行修改

  • 设置一个channel entering,当用户发来一个连接请求时,会首先向entering中发送信息,此时broadcaster会将该用户的连接信息添加到map中
  • 设置一个channel leaving,当用户离开聊天室时,会向leaving中发送信息,此时broadcaster会将用户的连接信息从map中删除
  • 设置一个channel messages,当用户在聊天室中发送信息时,会向messages发送数据,此时broadcaster会遍历map,将从messages中获取到的信息,发送给聊天室中的所有成员

2. 使用go net包搭建chat server

注:以下代码来自于Gopl第8章第10节

main函数的框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
go broadcaster()
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}

首先服务器会在localhost:8000监听tcp连接,之后每次接收到一个tcp连接,都会启动一个goroutine,调用handleConn来处理该连接。需要注意的是,如果此处没有启动新的goroutine的话,那么只有等到当前连接结束之后,server才能处理下一个tcp连接。

下面看一下handleConn的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func handleConn(conn net.Conn) {
ch := make(chan string) // outgoing client messages
go clientWriter(conn, ch)

who := conn.RemoteAddr().String()
ch <- "You are " + who
messages <- who + " has arrived"
entering <- ch

input := bufio.NewScanner(conn)
for input.Scan() {
messages <- who + ": " + input.Text()
}
// NOTE: ignoring potential errors from input.Err()

leaving <- ch
messages <- who + " has left"
conn.Close()
}

func clientWriter(conn net.Conn, ch <-chan string) {
for msg := range ch {
fmt.Fprintln(conn, msg) // NOTE: ignoring network errors
}
}

先看一下clientWriter的代码,很简单,clientWriter做的事情就是从channel ch中读取数据,然后发送给用户。net.Conn实现了io.Writer接口,因此直接调用fmt.Fprintln就直接可以向用户发送数据。

再来看handleConn的代码。这个函数上来就是先创建了一个channel ch,这个ch是服务器向用户传送信息的channel,每次向用户发送信息时,都会直接将信息发送到这个channel中;而接下来启动的这个goroutine调用clientWriterch中的信息读取出来,再发送给用户。

之后handleConnch发送到channel entering中,broadcaster会从entering中读到ch,并添加ch到map中(此处的map相当于是一个数据库)。

下面handleConn要做的是就是从用户处接收输入,并将用户的输入发送到channel messages里面。

当用户停止发送信息,准备离开聊天室时,handleConnch发送到leaving中,broadcaster从leaving中读取到ch,将ch从map中删除。此后关闭连接。

下面再来看一下broadcaster的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type client chan<- string // an outgoing message channel

var (
entering = make(chan client)
leaving = make(chan client)
messages = make(chan string) // all incoming client messages
)

func broadcaster() {
clients := make(map[client]bool) // all connected clients
for {
select {
case msg := <-messages:
// Broadcast incoming message to all
// clients' outgoing message channels.
for cli := range clients {
cli <- msg
}
case cli := <-entering:
clients[cli] = true

case cli := <-leaving:
delete(clients, cli)
close(cli)
}
}
}

在main函数开始执行时,就创建了一个broadcaster goroutine。通过多路复用,broadcaster实现了对三个channel的操作。broadcaster对enteringleaving的处理此处不再赘述,下面来看一下对channel messages的处理。其实也很简单,因为map中已经存放了所有用户的信息,那么只要遍历map就可以实现广播。

下面看一下运行结果:

首先是三个用户依次进入聊天室:

server:

1
go run main.go

client1:

1
2
3
4
❯ netcat localhost 8000
You are 127.0.0.1:37296
127.0.0.1:45114 has arrived
127.0.0.1:45124 has arrived

client2:

1
2
3
❯ netcat localhost 8000
You are 127.0.0.1:45114
127.0.0.1:45124 has arrived

client3:

1
2
❯ netcat localhost 8000
You are 127.0.0.1:45124

此后从client1-client3依次发送self-introduction:

client1:

1
2
3
4
5
6
7
8
❯ netcat localhost 8000
You are 127.0.0.1:37296
127.0.0.1:45114 has arrived
127.0.0.1:45124 has arrived
Hi, I am client1
127.0.0.1:37296: Hi, I am client1
127.0.0.1:45114: Hi, I am client2
127.0.0.1:45124: Hi, I am client3

client2:

1
2
3
4
5
6
7
❯ netcat localhost 8000
You are 127.0.0.1:45114
127.0.0.1:45124 has arrived
127.0.0.1:37296: Hi, I am client1
Hi, I am client2
127.0.0.1:45114: Hi, I am client2
127.0.0.1:45124: Hi, I am client3

client3:

1
2
3
4
5
6
❯ netcat localhost 8000
You are 127.0.0.1:45124
127.0.0.1:37296: Hi, I am client1
127.0.0.1:45114: Hi, I am client2
Hi, I am client3
127.0.0.1:45124: Hi, I am client3

原理图大概如下,其中方形代表goroutine,椭圆形代表channel:

3. A more advanced chat server

项目地址:https://github.com/TutorialEdge/realtime-chat-go-react.git

这个项目用的不是我们之前的tcp socket了,而是建立在tcp之上的websocket通信协议,但是server设计的基本思想还是一样的。

websocket是在TCP上独立设计的全双工,有message概念的通信协议,为了兼容互联网现状,因此使用了和HTTP相同的端口和相似的协议形式,下面是一段websocket协议的报文:

1
2
3
4
5
6
7
8
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com

由于websocket协议和HTTP协议相似,所以需要在websocket header中注明UpgradeConnection字段。

当客户端和服务端建立起websocket连接时,由于协议是双向的,服务端可以不断地推送信息给客户端。

websocket解决了HTTP的被动性(只能由客户端向服务端发送HTTP请求,也就是说通信只能由客户端发起)和无状态(服务端不会记录来自客户端的信息,每一次客户端发送HTTP请求时,都要携带之前的信息)的问题。

如果不使用websocket协议,那么则需要使用ajax或者long poll来实现双方的通信。

ajax是客户端不断地发送HTTP请求询问服务端是否有新的信息推送给客户端,没有则返回。这对服务端处理HTTP请求的速度有很高的要求。

而long poll采用的是阻塞模型,如果客户端询问服务端,但是服务端没有新的信息推送给客户端时,线程会阻塞,直到服务端有信息,并将信息推送给客户端。这对服务端并发能力提出了很高的要求。

下面来看一下项目的代码。先来看后端:

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func setupRoutes() {
pool := websocket.NewPool()
go pool.Start()

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(pool, w, r)
})
}

func main() {
fmt.Println("Distributed Chat App v0.01")
setupRoutes()
http.ListenAndServe(":8080", nil)
}

main函数中做的是setup routes,然后监听端口。函数setupRoutes新建一个pool(相当于#2 中的三个channel和一个map),并且启动一个goroutine(相当于#2 中的broadcaster)。

在前面已经提到过,直接调用http.handleFunchttp.ListenAndServe是在defaultServeMux中注册了路由。

下面再来看handler serveWs的代码:

main.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
fmt.Println("WebSocket Endpoint Hit")
conn, err := websocket.Upgrade(w, r)
if err != nil {
fmt.Fprintf(w, "%+v\n", err)
}

client := &websocket.Client{
Conn: conn,
Pool: pool,
}

pool.Register <- client
client.Read()
}

代码其实与#2 中的handleConn相似,只不过是调用了包"github.com/gorilla/websocket"中的API实现了对websocket header中Upgrade字段的处理。

再来看一下前端:

src/api/index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const socket = new WebSocket('ws://localhost:8080/ws') // eslint-disable-line no-undef

const connect = callback => {
console.log('Attempting Connection')

socket.onopen = () => {
console.log('Successfully Connected')
}

socket.onmessage = msg => {
console.log(msg)
callback(msg)
}

socket.onclose = event => {
console.log('Socket Closed Connection: ', event)
}

socket.onerror = error => {
console.log('Socket Error: ', error)
}
}

const sendMsg = msg => {
console.log('Sending message: ', msg)
socket.send(msg)
}

export { connect, sendMsg }

首先是建立一个websocket,注意链接不再是我们熟悉的http://...而是ws://localhost:8080/ws,是因为协议已经不同了。

同时设置两个API connectsendMsg,为app提供websocket支持。connect是用来建立websocket连接,而sendMsg是通过websocket发送信息。

src/App.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
function App () {
const [chatHistory, setChatHistory] = React.useState([])
React.useEffect(() => {
connect(msg => {
console.log('New message')

const newChatHistory = [...chatHistory, msg]
setChatHistory(newChatHistory)
})
console.log(chatHistory)
})

function handleSend (event) {
if (event.keyCode === 13) {
sendMsg(event.target.value)
event.target.value = ''
}
}

return (
<div>
<Header />
<ChatHistory chatHistory={chatHistory} />
<ChatInput handleSend={handleSend} />
</div>
)
}
export default App

调用React.useEffect,使得每次进入页面或者刷新页面时,都会调用connect与server建立websocket连接。handleSend是提供给ChatInput component的,目的是在每次按下回车键时,调用sendMsg发送信息。