目錄
- 前言
- 實現(xiàn)
- 接口的設(shè)計
- 內(nèi)部實現(xiàn)
- 測試一下
- 優(yōu)化
- 再測試一下
- 補充
前言
總所周知,go
里面只有兩種 channel
,一種是 unbuffered channel, 其聲明方式為
ch := make(chan interface{})
另一種是 buffered channel,其聲明方式為
bufferSize := 5
ch := make(chan interface{},bufferSize)
對于一個 buffered channel,無論它的 buffer 有多大,它終究是有極限的。這個極限就是該 channel 最初被 make 時,所指定的 bufferSize 。
jojo,buffer channel 的大小是有極限的,我不做 channel 了。
一旦 channel
滿了的話,再往里面添加元素的話,將會阻塞。
so how can we make a infinite buffer channel?
本文參考了 medinum 上面的一篇文章,有興趣的同學可以直接閱讀原文。
實現(xiàn)
接口的設(shè)計
首先當然是建一個 struct
,在百度翻譯的幫助下,我們將這個 struct
取名為 InfiniteChannel
type InfiniteChannel struct {
}
思考一下 channel
的核心行為,實際上就兩個,一個流入(Fan in),一個流出(Fan out),因此我們添加如下幾個 method。
func (c *InfiniteChannel) In(val interface{}) {
// todo
}
func (c *InfiniteChannel) Out() interface{} {
// todo
}
內(nèi)部實現(xiàn)
通過 In()
接收的數(shù)據(jù),總得需要一個地方來存放。我們可以用一個 slice
來存放,就算用 In()
往里面添加了很多元素,也可以通過 append()
來拓展 slice
,slice
的容量可以無限拓展下去(內(nèi)存足夠的話),所以 channel
也是 infinite
。 InfiniteChannel
的第一個成員就這么敲定下來的。
type InfiniteChannel struct {
data []interface{}
}
用戶調(diào)用 In()
和 Out()
時,可能是并發(fā)的環(huán)境,在 go
中如何進行并發(fā)編程,最容易想到的肯定是 channel
了,因此我們在內(nèi)部準備兩個 channel
,一個 inChan
,一個 outChan
,用 inChan
來接收數(shù)據(jù),用 outChan
來流出數(shù)據(jù)。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan - val
}
func (c *InfiniteChannel) Out() interface{} {
return -c.outChan
}
其中, inChan
和 outChan
都是 unbuffered channel。
此外,也肯定是需要一個 select
來處理來自 inChan
和 outChan
身上的事件。因此我們另起一個協(xié)程,在里面做 select
操作。
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := -c.inChan:
c.data = append(c.data, newVal)
case c.outChan - c.pop(): // pop() 將取出隊列的首個元素
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
}
go c.background() // 注意這里另起了一個協(xié)程
return c
}
ps:感覺這也算是 go 并發(fā)編程的一個套路了。即
- 在 new struct 的時候,順手 go 一個 select 協(xié)程,select 協(xié)程內(nèi)執(zhí)行一個 for 循環(huán),不停的 select,監(jiān)聽一個或者多個 channel 的事件。
- struct 對外提供的 method,只會操作 struct 內(nèi)的 channel(在本例中就是 inChan 和 outChan),不會操作 struct 內(nèi)的其他數(shù)據(jù)(在本例中,In() 和 Out() 都沒有直接操作 data)。
- 觸發(fā) channel 的事件后,由 select 協(xié)程進行數(shù)據(jù)的更新(在本例中就是 data )。因為只有 select 協(xié)程對除 channel 外的數(shù)據(jù)成員進行讀寫操作,且 go 保證了對于 channel 的并發(fā)讀寫是安全的,所以代碼是并發(fā)安全的。
- 如果 struct 是 exported ,用戶或許會越過 new ,直接手動 make 一個 struct,可以考慮將 struct 設(shè)置為 unexported,把它的首字母小寫即可。
pop()
的實現(xiàn)也非常簡單。
// 取出隊列的首個元素,如果隊列為空,將會返回一個 nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
測試一下
用一個協(xié)程每秒鐘生產(chǎn)一條數(shù)據(jù),另一個協(xié)程每半秒消費一條數(shù)據(jù),并打印。
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
}()
for i := 0; i 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// out
nil>0nil>1nil>23nil>4nil>nil>5nil>67nil>nil>89nil>nil>1011nil>12nil>13nil>14nil>15nil>16nil>17nil>nil>1819nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>
Process finished with the exit code 0
可以看到,將 InfiniteChannel
內(nèi)沒有數(shù)據(jù)可供消費時,調(diào)用 Out()
將會返回一個 nil
,不過這也在我們的意料之中,原因是 pop()
在隊列為空時,將會返回 nil。
目前 InfiniteChannel
的行為與標準的 channel
的行為是有出入的,go
中的 channel
,在沒有數(shù)據(jù)卻仍要取數(shù)據(jù)時會被阻塞,如何實現(xiàn)這個效果?
優(yōu)化
我認為此處是是整篇文章最有技巧的地方,我第一次看到時忍不住拍案叫絕。
首先把原來的 background()
摘出來
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := -c.inChan:
c.data = append(c.data, newVal)
case c.outChan - c.pop():
}
}
}
對 outChan
進行一個簡單封裝
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := -c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() - c.pop():
}
}
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
return c.outChan
}
目前為止,一切照舊。
點睛之筆來了:
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if len(c.data) == 0 {
return nil
}
return c.outChan
}
在 c.data
為空的時候,返回一個 nil
在 background()
中,當執(zhí)行到 case c.outChan - c.pop():
時,實際上將會變成:
在 go
中,是無法往一個 nil
的 channel
中發(fā)送元素的。例如
func main() {
var c chan interface{}
select {
case c - 1:
}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
var c chan interface{}
select {
case c - 1:
default:
fmt.Println("hello world")
}
}
// hello world
因此,對于
select {
case newVal := -c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() - c.pop():
}
將會一直阻塞在 select
那里,直到 inChan
來了數(shù)據(jù)。
再測試一下
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
最后,程序 panic
了,因為死鎖了。
補充
實際上 channel
除了 In()
和 Out()
外,還有一個行為,即 close()
,如果 channel close 后,依舊從其中取元素的話,將會取出該類型的默認值。
func main() {
c := make(chan interface{})
close(c)
for true {
v := -c
fmt.Println(v)
time.Sleep(time.Second)
}
}
// output
// nil>
// nil>
// nil>
// nil>
func main() {
c := make(chan interface{})
close(c)
for true {
v, isOpen := -c
fmt.Println(v, isOpen)
time.Sleep(time.Second)
}
}
// output
// nil> false
// nil> false
// nil> false
// nil> false
我們也需要實現(xiàn)相同的效果。
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := -c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() - c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
// 這里添加了對 c.isOpen 的判斷
if c.isOpen len(c.data) == 0 {
return nil
}
return c.outChan
}
再測試一下
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close() // 這里調(diào)用了 Close
}()
for i := 0; i 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// output
012345678910111213141516171819nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>nil>
Process finished with the exit code 0
符合預期
遺憾
目前看上去已經(jīng)很完美了,但是和標準的 channel
相比,仍然有差距。因為標準的 channel
是有這種用法的
可以通過 isOpen
變量來獲取 channel
的開閉情況。
因此 InfiniteChannel
也應(yīng)該提供一個類似的 method
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
// todo
}
可惜的是,要想得知 InfiniteChannel
是否是 Open
的,就必定要訪問 InfiniteChannel
內(nèi)的 isOpen
成員。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
而 isOpen
并非 channel
類型,根據(jù)之前的套路,這種非 channel
類型的成員只應(yīng)該被 select
協(xié)程訪問。一旦有多個協(xié)程訪問,就會出現(xiàn)并發(fā)問題,除非加鎖。
我不能接受!所以干脆不提供這個 method 了,嘿嘿。
完整代碼
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close()
}()
for i := 0; i 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan - val
}
func (c *InfiniteChannel) Out() interface{} {
return -c.outChan
}
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := -c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() - c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
// 取出隊列的首個元素,如果隊列為空,將會返回一個 nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if c.isOpen len(c.data) == 0 {
return nil
}
return c.outChan
}
參考
https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
以上就是再次探討go實現(xiàn)無限 buffer 的 channel方法的詳細內(nèi)容,更多關(guān)于go無限 buffer 的 channel的資料請關(guān)注腳本之家其它相關(guān)文章!
您可能感興趣的文章:- Go語言中使用 buffered channel 實現(xiàn)線程安全的 pool
- go原生庫的中bytes.Buffer用法
- C#語言使用gRPC、protobuf(Google Protocol Buffers)實現(xiàn)文件傳輸功能
- 詳解Django-channels 實現(xiàn)WebSocket實例
- Django使用Channels實現(xiàn)WebSocket的方法
- Django Channels 實現(xiàn)點對點實時聊天和消息推送功能