并发性: 在于编写代码时,可以独立编排并执行的控制流,可一次同时处理很多事情,
并行性: 在于代码运行时,多个计算同时运行,同时处理很多事情; 由此可发现并发适合并行执行;
Type Ctrl-\to kill a program and dump all its goroutine stacks. 输入 Ctrl-\ 终止程序,并且将所有的goroutine堆栈信息输出
Use the HTTP server’s /debug/pprof/goroutine to inspect live goroutine stacks. 使用http服务器的 /debug/pprof/goroutine 检查实时的goroutine堆栈;
Use a buffered channel as a concurrent blocking queue. 使用有缓冲的channel作为并发的阻塞队列;
Think carefully before introducing unbounded queuing. 在使用无界队列时,需要仔细思考!因为大部分情况下这不是最优解;
Close a channel to signal that no more values will be sent. 关闭channel,可以发出不再发生值的信号;
Stop timers you don’t need. 当不需要定时器时,一定要记得调用stop;
Prefer defer for unlocking mutexes. 如果锁是在离开函数后解锁,尽量使用defer+解锁,这样可以保证发生任何情况时,只要离开了函数(不是块)就会解锁;
Use a mutex/goto/goroutines&&channels&&mutexes if that is the clearest way to write the code. 如果使用互斥锁或者goto或者goroutine,channel,mutex一起,可以使程序清晰,当然可以使用;
用代码表示状态
程序状态存储在数据中,即state;
state := 0
for {
c := readChar()
switch state {
case 0:
if c != '"' {
return false
}
state = 1
case 1:
if c == '"' {
return true
}
if c == '\\' {
state = 2
} else {
state = 1
}
case 2:
state = 1
}
}
将状态存储在代码中,如下,将状态放进控制流中; 可以开一个goroutine保持状态(如果状态无法保持在栈中);
use additional goroutines to hold additional code state.
if readChar() != '"' {
return false // <- state == 0
}
var c rune
for c != '"' {
c := readChar() // <- state == 1
if c == '\\' {
readChar() // <- state == 2
}
}
return true
发布订阅服务器
发布者和订阅者通过发布订阅服务器连接;
注:实际上发布订阅服务器需要根据事件的类型过滤事件,现在假设单独处理过滤,即只实现发布和订阅;
// 需要实现的接口
type PubSub interface {
// 将事件e发布到所有当前订阅。
Publish(e Event)
// 订阅者 c 接收将来的事件。
// 所有订阅者都以相同的顺序接收事件,并且该顺序遵循程序顺序:
// 如果Publish(e1)发生在Publish(e2)之前,
// 则订阅者在e2之前接收e1。
Subscribe(c chan<- Event)
// 取消之前对频道C的订阅。
// 在c上发送了任何挂起的已发布事件后,
// 服务器将通过关闭c来发出取消订阅的信号。
Cancel(c chan<- Event)
}
注意:信息只在channel上单向流动;
type Server struct {
mu sync.Mutex
sub map[chan<- Event]bool
}
func (s *Server) Init() {
s.sub = make(map[chan<-Event]bool)
}
func (s *Server) Publish(e Event) {
s.mu.Lock()
defer s.mu.Unlock()
for c := range s.sub {
c <-e
}
}
func (s *Server) Subscribe(c chan<- Event) {
s.mu.Lock()
defer s.mu.Unlock()
if s.sub[c]{
panic("pubsub: already subscribed")
}
s.sub[c] = true
}
func (s *Server) Cancel(c chan<- Event) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.sub[c] {
panic("pubsub: not subscribed")
}
close(c)
delete(s.sub,c)
}
注:需要考虑:存在运行速度较慢的goroutine的情况,如果存在这种情况,解决方案:
降低事件发布的速度;
丢弃(合并)事件(ex:os/signal,runtime/pprof);
对任意数量的事件进行排序(在使用无界限的channel前要考虑清楚,因为它几乎不是最正确的选择);
type Server struct {
publish chan Event
subscribe chan subReq
cancel chan subReq
}
type subReq struct {
c chan<- Event
ok chan bool
}
func (s *Server) Init() {
s.publish = make(chan Event)
s.subscribe = make(chan subReg)
s.cancel = make(chan subReq)
go s.loop()
}
// 将Publish、Subscribe和Cancel合并
// 原有的订阅者元数据从全局变量变为属于s.loop了
func (s *Server) loop() {
sub := make(map[chan<-Event]chan<- Event)
for {
select {
case e :=<-s.publish:
for _, h := range sub {
h <- e
}
case r := <-s.subscribe:
if sub[r.c] != nil {
r.ok <- false
break
}
h = make(chan Event)
go helper(h, r.c)
sub[r.c] = h
r.ok <- true
case c := <-s.cancel:
if !sub[r.c] == nil{
r.ok <- false
break
}
close(sub[r.c])
delete(sub,r.c)
r.ok <- true
}
}
}
func (s *Server) Publish(e Event){
s.publish <-e
}
func (s *Server) Subscribe(c chan<- Event) {
r := subReq{c: c, ok: make(chan bool)}
s.subscribe <- r
if !<-r.ok{
panic("pubsub:already subscribed")
}
}
func (s *Server)Cancel(c chan<-Event){
r := subReq{c: c, ok: make(chan bool)}
s.cancel <- r
if !<-r.ok {
panic("pubsub:not subscribed")
}
}
// 防止订阅者接收过慢导致阻塞
func helper(in <-chan Event,
out chan<- Event){
var q []Event
for in!= nil && leen(q) > 0 {
// Decide whether and what to send.
var sendOut chan<- Event
var next Event
if len(q) > 0 {
sendOut out
next =q[0]
}
select {
case e, ok := <-in:
if !ok {
in = nil // stop receiving from in
break
}
q = append(q,e)
case sendOut <- next:
q = q[1:]
}
}
close(out)
}
由一个goroutine操作资源,可以避免数据竞争,即发布和订阅不能同时发生;
可以使用goroutine分离独立的key pooint;
工作调度器
func Schedule(servers []string,numTask int,
call func(srv string,task int)) {
idle := make(chan string, len(servers))
for _, srv := range servers {
idle <- srv
}
for task := 0; task < numTask; task++ {
task := task
// 当没有空闲服务器可用时,将会阻塞在此
// 如果放入goroutine中,就会创建超出空闲服务器数量的goroutine
srv := <-idle
go func() {
// task是创建goroutine时,
// 捕捉到的当前函数内的局部变量的引用
call(srv, task)
idle <- srv
}
}
for i := 0; i < len(servers); i++ {
<-idle
}
}
可以注意到现在是为每个任务分配一个goroutine,可以改进为:为每个服务器分配一个goroutine;
// 不使用channel的缓冲区
func Schedule(servers chan string, numTask int
call func(srv string, task int)) {
// 如果开辟缓冲区,就可以将任务写入队列后,再去计数done,
// 主goroutine不会阻塞在写入任务,即不会发生死锁
work := make(chan int)
done := make(chan bool)
runTasks := func(srv string) {
for task := range work {
if call(srv, task) {
done <- true
} else {
work <- task
}
}
}
go func() {
for {
select {
case srv := <-servers:
go runTasks(srv)
case <-exit:
return
}
}
}()
// ----
// 第一种写法:
// 使用select,可以同时 接收服务器的done 及 向服务器发送任务;
i := 0
WorkLoop:
for task := 0; task < numTask; task++ {
for {
select {
case work <- task:
continue WorkLoop
case <-done:
i++
}
}
}
// -----
// 第二种写法(思路:可以容忍让goroutine等待被处理,
// 在这里的具体思路是可以接收服务器在done上阻塞一会):
go func() {
for task := 0; task < numTask; task++{
work <- task
}
}()
// -----
for ; i < numTask; i++{
<-done
}
close(work)
exit <- true // 告诉调度器任务结束,不需要启用空闲服务器了
}
PS. 这样写之所以不会报错,是因为在for上的task在每次循环都会重新创建并复制(1.21,1.22的for每次迭代时都会为变量分配单独的地址空间),内部的task仅作用于for内部,故只是隐藏了for上的task;
for task := 0; task < 10; task++ {
task := task
go func() {
fmt.Println(task)
}()
}
复制服务客户端
希望服务的可靠性是依赖于复制; 客户端可以选择和其中任意服务器交互;
type ReplicatedClient interface {
// Init初始化客户端以使用给定的服务器;
// 要稍后发出特定请求,
// 客户端可以使用callOne(srv,args),
// 其中srv是列表中的服务器之一;
Init(servers []string, callOne func(string, Args) Reeply)
// 调用使请求成为可用的服务器;
// 多个goroutine可以同时调用call
Call(args Args) Reply
}
type Client struct {
servers []string
callOne func(string, Args) Reply
mu sync.Mutex
prefer int
}
func (c *Client) Init(servers []string, callOne func(string, Args) Reply) {
c.servers = servers
c.callOne = callOne
}
// 注意done有足够的缓冲区,避免goroutine阻塞;
// Call仅会返回一个服务器的响应,剩余的响应发送到done后,
// 会连通done一起被回收
func (c *client)Call(argsArgs)Reply {
type result struct {
serverID int
reply Reply
}
const timeout = 1 * time.Second
t := time.NewTimer(timeout)
defer t.Stop()
done := make(chan result, len(c.servers))
c.mu.Lock()
prefer := c.prefer
c.mu.Unlock()
var r result
for off := 0; off < len(c.servers); off++ {
id := (prefer + off) % len(c.servers)
go func() {
done <-result{id, c.callOne(c.servers[id], args)}
}()
select {
case r := <-done:
goto Done
case <-t.C:
//timeout
t.Reset(timeout)
}
}
r := <-done
Done:
c.mu.Lock()
c.prefer = r.serverID
c.mu.Unlock()
return r.reply
}
调用call后,客户端会寻找合适的服务器,并且长时间使用同一个服务器,直到服务器不合适,不会返回使用哪一个服务器; 定时器为何不能自动被回收: 正常情况下计时器被runtime引用,存放在active的计数器列表中,调用stop时等效于将计时器从active计数器列表中删除; 之所以不能自动回收:因为GC无法区分runtime的引用和程序其余部分的引用,所以channel无法被自动回收;
协议选择器
type ProtocolMux interface {
// 初始化mux以管理到给定服务的消息;
Init(Service)
// 使用给定的消息发出请求并返回回复;
// 多个goroutine可以同时调用Call;
Call(Msg)Msg
}
type Service interface {
// 在请求或回复消息中返回复用标识符;
// 多个goroutine可以同时调用ReadTag;
ReadTag(Msg) int64
// 向远程服务发送请求消息。
// 发送不能与自身同时调用。
Send(Msg)
// Recv等待并返回来自远程服务的回复消息。
// Recv不能与其本身同时调用。
Recv() Msg
}
type Mux struct {
srv Service
send chan Msg
mu sync.Mutes
pending map[int64]chan<- Msg
}
func (m *Mux) Init(srv Service) {
m.srv = srv
m.pending = make(map[int64]chan Msg)
go m.sendLoop()
go m.recvLoop()
}
// 序列化
func (m *Mux) sendLoop() {
for args := range m.send {
m.srv.Send(args)
}
}
// 序列化
func (m *Mux) recvLoop() {
for {
reply := m.srv.Recv()
tag := m.srv.Tag(reply)
m.mu.Lock()
done := m.pending[tag]
delete(m.pending, tag)
m.mu.Unlock()
if done == nil {
panic("unexpected reply")
}
done <- reply
}
}
func (m *Mux) Call(args Msg) (reply Msg) {
tag := m.srv.ReadTag(args)
done := make(chan Msg, 1)
m.mu.Lock()
if m.pending[tag] != nil {
m.mu.Unlock()
panic("mux: duplicate call tag")
}
m.pending[tag] = done
m.mu.Unlock()
m.send <- args
return <-done
}