1 什么是協(xié)程
協(xié)程是與其他函數(shù)或方法一起并發(fā)運(yùn)行的函數(shù)或方法。Go協(xié)程可以看作是輕量級(jí)線程,與線程相比,創(chuàng)建一個(gè) Go 協(xié)程的成本很小。
1.1 協(xié)程與線程的對(duì)比
協(xié)程的成本極低。堆棧大小只有若干KB(2或4KB),并且可以根據(jù)應(yīng)用的需求進(jìn)行增減。而線程必須指定堆棧的大小,其堆棧是固定不變的(一般默認(rèn)2MB)。固定了棧的大小導(dǎo)致兩個(gè)問題:
一是對(duì)于很多只需要很小的棧空間的線程來(lái)說(shuō)是一個(gè)巨大的浪費(fèi)
二是對(duì)于少數(shù)需要巨大??臻g的線程來(lái)說(shuō)又面臨棧溢出的風(fēng)險(xiǎn)
協(xié)程會(huì)復(fù)用(Multiplex)數(shù)量更少的 OS 線程。即使程序有數(shù)以千計(jì)的協(xié)程,也可能只有一個(gè)線程。
如果該線程中的某一Go協(xié)程發(fā)生了阻塞(比如說(shuō)等待用戶輸入),那么系統(tǒng)會(huì)再創(chuàng)建一個(gè)OS線程,并把其余協(xié)程都移動(dòng)到這個(gè)新的OS線程。所有這一切都在運(yùn)行時(shí)進(jìn)行,作為程序員,我們沒有直接面臨這些復(fù)雜的細(xì)節(jié),而是有一個(gè)簡(jiǎn)潔的 API 來(lái)處理并發(fā)。
Go內(nèi)置半搶占式的協(xié)作調(diào)度器,在用戶態(tài)進(jìn)行協(xié)程的調(diào)度。
Go協(xié)程使用信道(Channel)來(lái)進(jìn)行通信。信道用于防止多個(gè)協(xié)程訪問共享內(nèi)存時(shí)發(fā)生競(jìng)態(tài)條件(Race Condition)。信道可以看作是協(xié)程之間通信的管道。
1.2 啟動(dòng)協(xié)程
調(diào)用函數(shù)或者方法時(shí),在前面加上關(guān)鍵字go,可以讓一個(gè)新的Go協(xié)程并發(fā)地運(yùn)行。需要注意:
啟動(dòng)一個(gè)新的協(xié)程時(shí),協(xié)程的調(diào)用會(huì)立即返回。程序控制不會(huì)去等待Go協(xié)程執(zhí)行完畢。在調(diào)用Go協(xié)程之后,程序控制會(huì)立即返回到代碼的下一行,忽略該協(xié)程的任何返回值。
如果希望運(yùn)行其他Go協(xié)程,Go 主協(xié)程必須繼續(xù)運(yùn)行著。如果Go主協(xié)程終止,則程序終止,于是其他協(xié)程也不會(huì)繼續(xù)運(yùn)行。
使用示例如下:
package?main import?(?? ????"fmt" ????"time" ) func?numbers()?{?? ????for?i?:=?1;?i?<=?5;?i++?{ ????????time.Sleep(250?*?time.Millisecond) ????????fmt.Printf("%d?",?i) ????} } func?alphabets()?{?? ????for?i?:=?'a';?i?<=?'e';?i++?{ ????????time.Sleep(400?*?time.Millisecond) ????????fmt.Printf("%c?",?i) ????} } func?main()?{?? ????go?numbers()?//啟動(dòng)協(xié)程 ????go?alphabets()?//啟動(dòng)協(xié)程 ????//等待子協(xié)程允許完畢,后面介紹更高級(jí)的信道方式,這里就簡(jiǎn)單的等待 ????time.Sleep(3000?*?time.Millisecond) ????fmt.Println("main?terminated") } //輸出:1?a?2?3?b?4?c?5?d?e?main?terminated
下圖可以清晰的看到三個(gè)協(xié)程的運(yùn)行關(guān)系:
2 信道
2.1 信道的創(chuàng)建
信道可以想像成協(xié)程之間通信的管道。如同管道中的水會(huì)從一端流到另一端,通過(guò)使用信道,數(shù)據(jù)也可以從一端發(fā)送,在另一端接收。所有信道都關(guān)聯(lián)了一個(gè)類型。信道只能運(yùn)輸這種類型的數(shù)據(jù),而運(yùn)輸其他類型的數(shù)據(jù)都是非法的。chan T表示T類型的信道,使用make函數(shù)進(jìn)行初始化。例如:
a?:=?make(chan?int)
2.2 信道的收發(fā)
信道旁的箭頭方向指定了是發(fā)送數(shù)據(jù)還是接收數(shù)據(jù)
data?:=?<-?a?//?讀取信道a,保存值到data a?<-?data?//?寫入信道a
發(fā)送與接收默認(rèn)是阻塞的。當(dāng)把數(shù)據(jù)發(fā)送到信道時(shí),程序控制會(huì)在發(fā)送數(shù)據(jù)的語(yǔ)句處發(fā)生阻塞,直到有其它協(xié)程從信道讀取到數(shù)據(jù),才會(huì)解除阻塞。與此類似,當(dāng)讀取信道的數(shù)據(jù)時(shí),如果沒有其它的協(xié)程把數(shù)據(jù)寫入到這個(gè)信道,那么讀取過(guò)程就會(huì)一直阻塞著。**信道的這種特性能夠幫助Go協(xié)程之間進(jìn)行高效的通信,不需要用到其他編程語(yǔ)言常見的顯式鎖或條件變量。 借助阻塞這個(gè)特性,我們可以用一個(gè)讀操作等待子協(xié)程結(jié)束,而不是使用sleep:
func?hello(done?chan?bool)?{?? ????fmt.Println("Hello?world?goroutine") ????done?<-?true//子協(xié)程結(jié)束,寫入數(shù)據(jù) } func?main()?{?? ????done?:=?make(chan?bool)//創(chuàng)建bool信道 ????go?hello(done) ????<-done?//讀操作,一直阻塞直到子協(xié)程結(jié)束 ????fmt.Println("main?function") }
2.3 小心死鎖
使用信道需要考慮的一個(gè)重點(diǎn)是死鎖。
當(dāng)Go協(xié)程給一個(gè)信道發(fā)送數(shù)據(jù)時(shí),照理說(shuō)會(huì)有其他Go協(xié)程來(lái)接收數(shù)據(jù)。如果沒有的話,程序就會(huì)在運(yùn)行時(shí)觸發(fā) panic,形成死鎖。
當(dāng)有Go協(xié)程等著從一個(gè)信道接收數(shù)據(jù)時(shí),我們期望其他的Go協(xié)程會(huì)向該信道寫入數(shù)據(jù),要不然程序就會(huì)觸發(fā) panic。
2.4 關(guān)閉信道和range遍歷
數(shù)據(jù)發(fā)送方可以關(guān)閉信道,通知接收方這個(gè)信道不再有數(shù)據(jù)發(fā)送過(guò)來(lái)。當(dāng)從信道接收數(shù)據(jù)時(shí),接收方可以多用一個(gè)變量來(lái)檢查信道是否已經(jīng)關(guān)閉。
func?producer(chnl?chan?int)?{??
????for?i?:=?0;?i?10;?i++?{ ????????chnl?<-?i ????} ????close(chnl)//關(guān)閉信道 } func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?{ ????????v,?ok?:=?<-ch?//判斷信道是否關(guān)閉 ????????if?ok?==?false?{ ????????????break ????????} ????????fmt.Println("Received?",?v,?ok) ????} }
上面的語(yǔ)句里,如果成功接收信道所發(fā)送的數(shù)據(jù),那么 ok 等于 true。而如果 ok 等于 false,說(shuō)明我們?cè)噲D讀取一個(gè)關(guān)閉的通道。從關(guān)閉的信道讀取到的值會(huì)是該信道類型的零值。 或者我們可以用range遍歷信道,代替上面示例中的for循環(huán):
func?main()?{?? ????ch?:=?make(chan?int) ????go?producer(ch) ????for?v?:=?range?ch?{//range可以在信道關(guān)閉后自動(dòng)結(jié)束,不用顯示的判斷 ????????fmt.Println("Received?",v) ????} }
2.5 緩沖信道
上面無(wú)緩沖信道的發(fā)送和接收過(guò)程是阻塞的,讀寫操作會(huì)一直阻塞。我們還可以創(chuàng)建一個(gè)有緩沖的信道(Buffered Channel)。只在緩沖已滿的情況,才會(huì)阻塞向緩沖信道發(fā)送數(shù)據(jù)。同樣,只有在緩沖為空的時(shí)候,才會(huì)阻塞從緩沖信道接收數(shù)據(jù)。 通過(guò)向 make 函數(shù)時(shí)再傳遞一個(gè)表示容量的參數(shù)(指定緩沖的大小,sizeof(type) * capacity),就可以創(chuàng)建緩沖信道。
ch?:=?make(chan?type,?capacity)//capacity?應(yīng)該大于?0。無(wú)緩沖信道的容量默認(rèn)為?0
緩沖區(qū)容量和長(zhǎng)度的區(qū)別:
容量是指信道可以存儲(chǔ)的值的數(shù)量(總的大?。?。我們?cè)谑褂胢ake函數(shù)創(chuàng)建緩沖信道的時(shí)候會(huì)指定容量大小。
長(zhǎng)度是指信道中當(dāng)前排隊(duì)的元素個(gè)數(shù)(當(dāng)前保存的大小)。
使用示例如下:
func?write(ch?chan?int)?{??
????for?i?:=?0;?i?5;?i++?{ ????????ch?<-?i?//寫入兩個(gè)值之后緩沖區(qū)滿,阻塞等待緩沖區(qū)空閑 ????????fmt.Println("successfully?wrote",?i,?"to?ch") ????} ????close(ch) } func?main()?{?? ????ch?:=?make(chan?int,?2)//緩沖大小為2 ????go?write(ch) ????time.Sleep(2?*?time.Second) ????for?v?:=?range?ch?{ ????????fmt.Println("read?value",?v,"from?ch") ????????time.Sleep(2?*?time.Second) ????} }
2.6 select
select 語(yǔ)句用于在多個(gè)發(fā)送/接收信道操作中進(jìn)行選擇。該語(yǔ)法與 switch 類似,所不同的是,這里的每個(gè) case 語(yǔ)句都是信道操作。
select 語(yǔ)句會(huì)一直阻塞,直到發(fā)送/接收操作準(zhǔn)備就緒。如果有多個(gè)信道操作準(zhǔn)備完畢,select 會(huì)隨機(jī)地選取其中之一執(zhí)行。
在沒有case準(zhǔn)備就緒時(shí),可以執(zhí)行select語(yǔ)句中的默認(rèn)情況(Default Case),這通常用于防止select語(yǔ)句一直阻塞,沒有信道可用時(shí)會(huì)立刻返回。
使用示例:
func?server1(ch?chan?string)?{?? ????time.Sleep(6?*?time.Second) ????ch?<-?"from?server1" } func?server2(ch?chan?string)?{?? ????time.Sleep(3?*?time.Second) ????ch?<-?"from?server2" } func?main()?{?? ????output1?:=?make(chan?string) ????output2?:=?make(chan?string) ????go?server1(output1) ????go?server2(output2) ????select?{//一直阻塞,直到某個(gè)信道可用 ????case?s1?:=?<-output1: ????????fmt.Println(s1) ????case?s2?:=?<-output2: ????????fmt.Println(s2) ????} }
3 WaitGroup
3.1 WaitGroup的使用
WaitGroup可以用來(lái)等待一批go協(xié)程執(zhí)行結(jié)束,類似于C++的std::join。使用示例如下:
import?( ????"fmt" ????"sync" ????"time" ) func?process(i?int,?wg?*sync.WaitGroup)?{//waitgroup參數(shù)指針,因?yàn)橐薷膬?nèi)部的值,不能是值傳遞 ????fmt.Println("started?Goroutine?",?i) ????time.Sleep(2?*?time.Second) ????fmt.Printf("Goroutine?%d?ended ",?i) ????wg.Done()//子協(xié)程結(jié)束,調(diào)用done減少計(jì)數(shù)器 } func?main()?{ ????no?:=?3 ????var?wg?sync.WaitGroup?//定義waitgroup ????for?i?:=?0;?i?3.2 實(shí)現(xiàn)一個(gè)協(xié)程池
基本思路:
創(chuàng)建一個(gè)Go協(xié)程池,監(jiān)聽一個(gè)等待作業(yè)分配的輸入型緩沖信道
將作業(yè)添加到該輸入型緩沖信道中
作業(yè)完成后,再將結(jié)果寫入一個(gè)輸出型緩沖信道
從輸出型緩沖信道讀取并打印結(jié)果
代碼和解析如下:
package?main import?(?? ????"fmt" ????"math/rand" ????"sync" ????"time" ) //定義任務(wù)和結(jié)果兩個(gè)結(jié)構(gòu)體 type?Job?struct?{?? ????id???????int ????randomno?int } type?Result?struct?{?? ????job?????????Job?//包含job結(jié)構(gòu)體 ????sumofdigits?int } //創(chuàng)建任務(wù)和結(jié)果的兩個(gè)緩沖信道 var?jobs?=?make(chan?Job,?10)?? var?results?=?make(chan?Result,?10) //計(jì)算一個(gè)整數(shù)每一位相加的和 func?digits(number?int)?int?{?? ????sum?:=?0 ????no?:=?number ????for?no?!=?0?{ ????????digit?:=?no?%?10 ????????sum?+=?digit ????????no?/=?10 ????} ????time.Sleep(2?*?time.Second) ????return?sum } //遍歷job信道,計(jì)算后每個(gè)job的數(shù)字并將結(jié)果寫入reslut信道 func?worker(wg?*sync.WaitGroup)?{?? ????for?job?:=?range?jobs?{ ????????output?:=?Result{job,?digits(job.randomno)} ????????results?<-?output ????} ????wg.Done() } //初始化waitgroup,并開啟多個(gè)協(xié)程開始計(jì)算 func?createWorkerPool(noOfWorkers?int)?{?? ????var?wg?sync.WaitGroup ????for?i?:=?0;?i?4 協(xié)程的同步手段4.1 互斥與Mutex
Mutex用于提供一種加鎖機(jī)制(Locking Mechanism),可確保在某時(shí)刻只有一個(gè)協(xié)程在臨界區(qū)運(yùn)行,以防止出現(xiàn)競(jìng)態(tài)條件。Mutex可以在sync包內(nèi)找到。Mutex 定義了兩個(gè)方法:Lock和Unlock。所有在 Lock 和 Unlock 之間的代碼,都只能由一個(gè)Go協(xié)程執(zhí)行,于是就可以避免競(jìng)態(tài)條件。
mutex.Lock() x?=?x?+?1?? mutex.Unlock()使用示例:
//互斥鎖保證線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?struct?{?//全局的結(jié)構(gòu)體變量 ?sync.Mutex?//互斥鎖 ?value??????int } func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?for?i?:=?0;?i?<=?100;?i++?{ ??total.Lock()?//加鎖 ??total.value++ ??total.Unlock()?//解鎖 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(total.value) }4.2 原子操作
用互斥鎖來(lái)保護(hù)一個(gè)數(shù)值型的共享資源,麻煩且效率低下。標(biāo)準(zhǔn)庫(kù)的sync/atomic包對(duì)原子操作提供了豐富的支持:sync/atomic包對(duì)基本的數(shù)值類型及復(fù)雜對(duì)象的讀寫都提供了原子操作的支持。atomic.Value原子對(duì)象提供了Load和Store兩個(gè)原子方法,分別用于加載和保存數(shù)據(jù),返回值和參數(shù)都是interface{}類型。
//原子操作實(shí)現(xiàn)線程同步 package?main import?( ?"fmt" ?"sync" ?"sync/atomic" ) var?total?uint64 func?worker(wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??atomic.AddUint64(&total,?1)?//原子操作,線程安全的 ?} } func?main()?{ ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg) ?go?worker(&wg) ?wg.Wait() ?fmt.Println(atomic.LoadUint64(&total))?//讀取值 }4.3 阻塞信道
上面的示例我們也可以用信道來(lái)實(shí)現(xiàn)互斥(還是推薦實(shí)際中使用Mutex),使用大小為1的緩沖信道可以導(dǎo)致可寫阻塞,這樣其他協(xié)程就不能繼續(xù)執(zhí)行,只能等待阻塞結(jié)束。在并發(fā)編程中,對(duì)共享資源的正確訪問需要精確的控制,在目前的絕大多數(shù)語(yǔ)言中,都是通過(guò)加鎖等線程同步方案來(lái)解決這一困難問題,而Go語(yǔ)言卻另辟蹊徑,它將共享的值通過(guò)Channel傳遞(實(shí)際上多個(gè)獨(dú)立執(zhí)行的線程很少主動(dòng)共享資源)。在任意給定的時(shí)刻,最好只有一個(gè)Goroutine能夠擁有該資源。
//使用channel實(shí)現(xiàn)線程同步 package?main import?( ?"fmt" ?"sync" ) var?total?uint64 func?worker(wg?*sync.WaitGroup,?ch?chan?bool)?{ ?defer?wg.Done() ?var?i?uint64 ?for?i?=?0;?i?<=?100;?i++?{ ??ch?<-?true?//信道被寫入值,其他協(xié)程到這一句也想寫入值,就會(huì)阻塞等待信道可寫 ??total++ ??<-ch?//本協(xié)程讀取信道,信道空了,其他協(xié)程可以寫入了 ?} } func?main()?{ ?ch?:=?make(chan?bool,?1)?//?創(chuàng)建大小為1的chan ?var?wg?sync.WaitGroup ?wg.Add(2) ?go?worker(&wg,?ch) ?go?worker(&wg,?ch) ?wg.Wait() ?fmt.Println(total)?//讀取值 }不僅如此,我們還可以通過(guò)設(shè)置chan的緩存大小來(lái)控制最大并發(fā)數(shù)。5 常見并發(fā)模型
5.1 生產(chǎn)者消費(fèi)者模型
通過(guò)平衡生產(chǎn)線程和消費(fèi)線程的工作能力來(lái)提高程序的整體處理數(shù)據(jù)的速度。簡(jiǎn)單地說(shuō),就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù),然后放到成果隊(duì)列中,同時(shí)消費(fèi)者從成果隊(duì)列中來(lái)取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費(fèi)變成了異步的兩個(gè)過(guò)程。當(dāng)成果隊(duì)列中沒有數(shù)據(jù)時(shí),消費(fèi)者就進(jìn)入饑餓的等待中;而當(dāng)成果隊(duì)列中數(shù)據(jù)已滿時(shí),生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致CPU被剝奪的下崗問題。 Go可以使用帶緩沖區(qū)的chan作為成功隊(duì)列,由不同的協(xié)程負(fù)責(zé)接入和讀取,很簡(jiǎn)單的實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型:
package?main import?( ?"fmt" ?"os" ?"os/signal" ?"syscall" ) //?生產(chǎn)者:?生成?factor?整數(shù)倍的序列 func?Producer(factor?int,?out?chan<-?int)?{ ?for?i?:=?0;?;?i++?{ ??out?<-?i?*?factor?//往信道緩沖區(qū)內(nèi)寫入數(shù)據(jù) ?} } //?消費(fèi)者 func?Consumer(in?<-chan?int)?{ ?for?v?:=?range?in?{ ??fmt.Println(v)?//從信道讀取數(shù)據(jù)打印 ?} } func?main()?{ ?ch?:=?make(chan?int,?64)?//?成果隊(duì)列,大小為64 ?//開啟了2個(gè)Producer生產(chǎn)流水線,分別用于生成3和5的倍數(shù)的序列 ?//然后開啟1個(gè)Consumer消費(fèi)者線程,打印獲取的結(jié)果 ?go?Producer(3,?ch)?//?生成?3?的倍數(shù)的序列 ?go?Producer(5,?ch)?//?生成?5?的倍數(shù)的序列 ?go?Consumer(ch)????//?消費(fèi)?生成的隊(duì)列 ?//?Ctrl+C?退出 ?sig?:=?make(chan?os.Signal,?1) ?signal.Notify(sig,?syscall.SIGINT,?syscall.SIGTERM) ?fmt.Printf("quit?(%v) ",?<-sig) }5.2 發(fā)布訂閱模型
發(fā)布訂閱(publish/subscribe)模型通常被簡(jiǎn)寫為pub/sub模型。在這個(gè)模型中,消息生產(chǎn)者成為發(fā)布者(publisher),而消息消費(fèi)者則成為訂閱者(subscriber),生產(chǎn)者和消費(fèi)者是M:N的關(guān)系。在傳統(tǒng)生產(chǎn)者和消費(fèi)者模型中,是將消息發(fā)送到一個(gè)隊(duì)列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個(gè)主題。在發(fā)布訂閱模型中,每條消息都會(huì)傳送給多個(gè)訂閱者。發(fā)布者通常不會(huì)知道、也不關(guān)心哪一個(gè)訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)添加,是一種松散的耦合關(guān)系,這使得系統(tǒng)的復(fù)雜性可以隨時(shí)間的推移而增長(zhǎng)。在現(xiàn)實(shí)生活中,像天氣預(yù)報(bào)之類的應(yīng)用就可以應(yīng)用這個(gè)并發(fā)模式。 示例代碼如下:
//?發(fā)布訂閱模型實(shí)現(xiàn) package?pubsub import?( ?"sync" ?"time" ) type?( ?subscriber?chan?interface{}?????????//?訂閱者為一個(gè)管道 ?topicFunc??func(v?interface{})?bool?//?主題為一個(gè)過(guò)濾器 ) //?發(fā)布者對(duì)象 type?Publisher?struct?{ ?m???????????sync.RWMutex?????????????//?讀寫鎖,保護(hù)訂閱者map ?buffer??????int??????????????????????//?訂閱隊(duì)列的緩存大小 ?timeout?????time.Duration????????????//?發(fā)布超時(shí)時(shí)間 ?subscribers?map[subscriber]topicFunc?//?訂閱者信息 } //?構(gòu)建一個(gè)發(fā)布者對(duì)象,?可以設(shè)置發(fā)布超時(shí)時(shí)間和緩存隊(duì)列的長(zhǎng)度 func?NewPublisher(publishTimeout?time.Duration,?buffer?int)?*Publisher?{ ?return?&Publisher{?//返回對(duì)象指針 ??buffer:??????buffer, ??timeout:?????publishTimeout, ??subscribers:?make(map[subscriber]topicFunc),?//創(chuàng)建訂閱者map ?} } //?添加一個(gè)新的訂閱者,訂閱全部主題 func?(p?*Publisher)?Subscribe()?chan?interface{}?{ ?return?p.SubscribeTopic(nil) } //?添加一個(gè)新的訂閱者,訂閱過(guò)濾器篩選后的主題 func?(p?*Publisher)?SubscribeTopic(topic?topicFunc)?chan?interface{}?{ ?ch?:=?make(chan?interface{},?p.buffer) ?p.m.Lock() ?p.subscribers[ch]?=?topic ?p.m.Unlock() ?return?ch } //?退出訂閱 func?(p?*Publisher)?Evict(sub?chan?interface{})?{ ?p.m.Lock() ?defer?p.m.Unlock()?//函數(shù)退出時(shí)解鎖 ?delete(p.subscribers,?sub)?//根據(jù)key刪除map中一項(xiàng) ?close(sub)?????????????????//關(guān)閉chan } //?發(fā)布一個(gè)主題 func?(p?*Publisher)?Publish(v?interface{})?{ ?p.m.RLock() ?defer?p.m.RUnlock() ?var?wg?sync.WaitGroup ?for?sub,?topic?:=?range?p.subscribers?{ ??wg.Add(1) ??go?p.sendTopic(sub,?topic,?v,?&wg) ?} ?wg.Wait() } //?關(guān)閉發(fā)布者對(duì)象,同時(shí)關(guān)閉所有的訂閱者管道。 func?(p?*Publisher)?Close()?{ ?p.m.Lock() ?defer?p.m.Unlock() ?for?sub?:=?range?p.subscribers?{ ??delete(p.subscribers,?sub) ??close(sub) ?} } //?發(fā)送主題,可以容忍一定的超時(shí) func?(p?*Publisher)?sendTopic(sub?subscriber,?topic?topicFunc,?v?interface{},?wg?*sync.WaitGroup)?{ ?defer?wg.Done() ?if?topic?!=?nil?&&?!topic(v)?{ ??return ?} ?//監(jiān)聽sub?chan寫入成功或超時(shí) ?select?{ ?case?sub?<-?v: ?case?<-time.After(p.timeout): ?} }我們可以選擇訂閱全部,或指定自定義函數(shù)只訂閱符合要求的消息,返回chan對(duì)象:
all?:=?p.Subscribe()?//添加一個(gè)訂閱者,訂閱全部消息 //添加一個(gè)訂閱者,只關(guān)系有g(shù)olang字符串的內(nèi)容 golang?:=?p.SubscribeTopic(func(v?interface{})?bool?{ ?if?s,?ok?:=?v.(string);?ok?{ ??return?strings.Contains(s,?"golang") ?} ?return?false }) 編輯:黃飛?
?
?
評(píng)論