上一篇(L9)

下一篇(L11)

并发性: 在于编写代码时,可以独立编排并执行的控制流,可一次同时处理很多事情,

并行性: 在于代码运行时,多个计算同时运行,同时处理很多事情; 由此可发现并发适合并行执行;

Type Ctrl-\to kill a program and dump all its goroutine stacks. 输入 Ctrl-\ 终止程序,并且将所有的goroutine堆栈信息输出

Use the HTTP server’s /debug/pprof/goroutine to inspect live goroutine stacks. 使用http服务器的 /debug/pprof/goroutine 检查实时的goroutine堆栈;

Use a buffered channel as a concurrent blocking queue. 使用有缓冲的channel作为并发的阻塞队列;

Think carefully before introducing unbounded queuing. 在使用无界队列时,需要仔细思考!因为大部分情况下这不是最优解;

Close a channel to signal that no more values will be sent. 关闭channel,可以发出不再发生值的信号;

Stop timers you don’t need. 当不需要定时器时,一定要记得调用stop;

Prefer defer for unlocking mutexes. 如果锁是在离开函数后解锁,尽量使用defer+解锁,这样可以保证发生任何情况时,只要离开了函数(不是块)就会解锁;

Use a mutex/goto/goroutines&&channels&&mutexes if that is the clearest way to write the code. 如果使用互斥锁或者goto或者goroutine,channel,mutex一起,可以使程序清晰,当然可以使用;


用代码表示状态

程序状态存储在数据中,即state;

state := 0
for {
    c := readChar()
    switch state {
    case 0:
        if c != '"' {
            return false
        }  
        state = 1
    case 1:
        if c == '"' {
            return true
        }
        if c == '\\' {
            state = 2
        } else {
            state = 1
        }
    case 2:
        state = 1
    }
}

将状态存储在代码中,如下,将状态放进控制流中; 可以开一个goroutine保持状态(如果状态无法保持在栈中);

use additional goroutines to hold additional code state.


if readChar() != '"' {
    return false			// <- state == 0
}

var c rune
for c != '"' {
    c := readChar()			// <- state == 1
    if c == '\\' {
        readChar()			// <- state == 2
    }
}
return true


发布订阅服务器

发布者和订阅者通过发布订阅服务器连接;

注:实际上发布订阅服务器需要根据事件的类型过滤事件,现在假设单独处理过滤,即只实现发布和订阅;


// 需要实现的接口
type PubSub interface {
    // 将事件e发布到所有当前订阅。
    Publish(e Event)
    
    // 订阅者 c 接收将来的事件。
    // 所有订阅者都以相同的顺序接收事件,并且该顺序遵循程序顺序:
    // 如果Publish(e1)发生在Publish(e2)之前,
    // 则订阅者在e2之前接收e1。
    Subscribe(c chan<- Event)
    
    // 取消之前对频道C的订阅。
    // 在c上发送了任何挂起的已发布事件后,
    // 服务器将通过关闭c来发出取消订阅的信号。
    Cancel(c chan<- Event)
}

注意:信息只在channel上单向流动;


type Server struct {
    mu sync.Mutex
    sub map[chan<- Event]bool
}

func (s *Server) Init() {
    s.sub = make(map[chan<-Event]bool)
}

func (s *Server) Publish(e Event) {
    s.mu.Lock()
    defer s.mu.Unlock()
        
    for c := range s.sub {
        c <-e
    }
}

func (s *Server) Subscribe(c chan<- Event) {
    s.mu.Lock()
    defer s.mu.Unlock()
        
    if s.sub[c]{
        panic("pubsub: already subscribed")
    }
    s.sub[c] = true
}
    
func (s *Server) Cancel(c chan<- Event) {
    s.mu.Lock()
    defer s.mu.Unlock()
    
    if !s.sub[c] {
        panic("pubsub: not subscribed")
    }
    close(c)
    delete(s.sub,c)
}

注:需要考虑:存在运行速度较慢的goroutine的情况,如果存在这种情况,解决方案: 降低事件发布的速度; 丢弃(合并)事件(ex:os/signal,runtime/pprof); 对任意数量的事件进行排序(在使用无界限的channel前要考虑清楚,因为它几乎不是最正确的选择);

type Server struct {
    publish chan Event
    subscribe chan subReq
    cancel chan subReq
}

type subReq struct {
    c chan<- Event
    ok chan bool
}

func (s *Server) Init() {
    s.publish = make(chan Event)
    s.subscribe = make(chan subReg)
    s.cancel = make(chan subReq)
    go s.loop()
}

// 将Publish、Subscribe和Cancel合并
// 原有的订阅者元数据从全局变量变为属于s.loop了
func (s *Server) loop() {
    sub := make(map[chan<-Event]chan<- Event)
    for {
        select {
            case e :=<-s.publish:
                for _, h := range sub {
                    h <- e
                }
            case r := <-s.subscribe:
                if sub[r.c] != nil {
                    r.ok <- false
                    break
                }
                h = make(chan Event)
                go helper(h, r.c)
                sub[r.c] = h
                r.ok <- true
                    
            case c := <-s.cancel:
            if !sub[r.c] == nil{
                r.ok <- false
                break
            }
            close(sub[r.c])
            delete(sub,r.c)
            r.ok <- true
        }
    }
}

func (s *Server) Publish(e Event){
    s.publish <-e
}

func (s *Server) Subscribe(c chan<- Event) {
    r := subReq{c: c, ok: make(chan bool)}
    s.subscribe <- r
    if !<-r.ok{
        panic("pubsub:already subscribed")
    }
}

func (s *Server)Cancel(c chan<-Event){
    r := subReq{c: c, ok: make(chan bool)}
    s.cancel <- r
    if !<-r.ok {
        panic("pubsub:not subscribed")
    }
}

// 防止订阅者接收过慢导致阻塞
func helper(in <-chan Event,
    out chan<- Event){
    var q []Event
    for in!= nil && leen(q) > 0 {
        // Decide whether and what to send.
        var sendOut chan<- Event
        var next Event
        if len(q) > 0 {
            sendOut out
            next =q[0]
        }
            
        select {
        case e, ok := <-in:
            if !ok {
                in = nil // stop receiving from in
                break
            }
            q = append(q,e)
        case sendOut <- next:
            q = q[1:]
        }    
    }
    close(out)
}

由一个goroutine操作资源,可以避免数据竞争,即发布和订阅不能同时发生;

可以使用goroutine分离独立的key pooint;


工作调度器

func Schedule(servers []string,numTask int,
    call func(srv string,task int)) {
    
    idle := make(chan string, len(servers))
    for _, srv := range servers {
        idle <- srv
    }
    for task := 0; task < numTask; task++ {       
        task := task
        // 当没有空闲服务器可用时,将会阻塞在此
        // 如果放入goroutine中,就会创建超出空闲服务器数量的goroutine
        srv := <-idle
        go func() {
            // task是创建goroutine时,
            // 捕捉到的当前函数内的局部变量的引用
            call(srv, task)
            idle <- srv
        }
    }

    for i := 0; i < len(servers); i++ {
        <-idle
    }
}

可以注意到现在是为每个任务分配一个goroutine,可以改进为:为每个服务器分配一个goroutine;

// 不使用channel的缓冲区
func Schedule(servers chan string, numTask int
            call func(srv string, task int)) {

    // 如果开辟缓冲区,就可以将任务写入队列后,再去计数done,
    // 主goroutine不会阻塞在写入任务,即不会发生死锁
    work := make(chan int)
    done := make(chan bool)
    
    runTasks := func(srv string) {
        for task := range work {
            if call(srv, task) {
                done <- true
            } else {
                work <- task
            }
        }
    }
    
    go func() {
        for {
            select {
            case srv := <-servers:
                go runTasks(srv)
            case <-exit:
                return
            }
        }
    }()

    // ----
    // 第一种写法:
    // 使用select,可以同时 接收服务器的done 及 向服务器发送任务;
    i := 0
WorkLoop:
    for task := 0; task < numTask; task++ {
        for {
            select {
            case work <- task:
                continue WorkLoop
            case <-done:
                i++
            }
        }
    }
    // -----
    // 第二种写法(思路:可以容忍让goroutine等待被处理,
    // 在这里的具体思路是可以接收服务器在done上阻塞一会):
    go func() {
        for task := 0; task < numTask; task++{
            work <- task
        }
    }()
    // -----

    
    for ; i < numTask; i++{
        <-done
    }
    close(work)
    exit <- true // 告诉调度器任务结束,不需要启用空闲服务器了
}

PS. 这样写之所以不会报错,是因为在for上的task在每次循环都会重新创建并复制(1.21,1.22的for每次迭代时都会为变量分配单独的地址空间),内部的task仅作用于for内部,故只是隐藏了for上的task;

for task := 0; task < 10; task++ {
    task := task
    go func() {
        fmt.Println(task)
    }()
}


复制服务客户端

希望服务的可靠性是依赖于复制; 客户端可以选择和其中任意服务器交互;

type ReplicatedClient interface {
    // Init初始化客户端以使用给定的服务器;
    // 要稍后发出特定请求,
    // 客户端可以使用callOne(srv,args),
    // 其中srv是列表中的服务器之一;
    Init(servers []string, callOne func(string, Args) Reeply)

    // 调用使请求成为可用的服务器;
    // 多个goroutine可以同时调用call
    Call(args Args) Reply
}

type Client struct {
    servers []string
    callOne func(string, Args) Reply

    mu		sync.Mutex
    prefer	int
}

func (c *Client) Init(servers []string, callOne func(string, Args) Reply) {
    c.servers = servers
    c.callOne = callOne
}

// 注意done有足够的缓冲区,避免goroutine阻塞;
// Call仅会返回一个服务器的响应,剩余的响应发送到done后,
// 会连通done一起被回收
func (c *client)Call(argsArgs)Reply {
    type result struct {
        serverID int
        reply Reply
    }
    
    const timeout = 1 * time.Second
    t := time.NewTimer(timeout)
    defer t.Stop()
    
    done := make(chan result, len(c.servers))

    c.mu.Lock()
    prefer := c.prefer
    c.mu.Unlock()

    var r result
    for off := 0; off < len(c.servers); off++ {
    id := (prefer + off) % len(c.servers)
    go func() {
        done <-result{id, c.callOne(c.servers[id], args)}
    }()
        
    select {
    case r := <-done:
        goto Done
    case <-t.C:
        //timeout
        t.Reset(timeout)
        }
    }
    r := <-done
Done:
    c.mu.Lock()
    c.prefer = r.serverID
    c.mu.Unlock()
    return r.reply
}
    调用call后,客户端会寻找合适的服务器,并且长时间使用同一个服务器,直到服务器不合适,不会返回使用哪一个服务器;
定时器为何不能自动被回收:
    正常情况下计时器被runtime引用,存放在active的计数器列表中,调用stop时等效于将计时器从active计数器列表中删除;
    之所以不能自动回收:因为GC无法区分runtime的引用和程序其余部分的引用,所以channel无法被自动回收;


协议选择器

type ProtocolMux interface {
    // 初始化mux以管理到给定服务的消息;
    Init(Service)
    
    // 使用给定的消息发出请求并返回回复;
    // 多个goroutine可以同时调用Call;
    Call(Msg)Msg
}

type Service interface {
    // 在请求或回复消息中返回复用标识符;
    // 多个goroutine可以同时调用ReadTag;
    ReadTag(Msg) int64
        
    // 向远程服务发送请求消息。
    // 发送不能与自身同时调用。
    Send(Msg)
       
    // Recv等待并返回来自远程服务的回复消息。
    // Recv不能与其本身同时调用。
    Recv() Msg
}

type Mux struct {
    srv 	Service
    send 	chan Msg

    mu		sync.Mutes
    pending map[int64]chan<- Msg
}

func (m *Mux) Init(srv Service) {
    m.srv = srv
    m.pending = make(map[int64]chan Msg)
    go m.sendLoop()
    go m.recvLoop()
}

// 序列化
func (m *Mux) sendLoop() {
    for args := range m.send {
        m.srv.Send(args)
    }
}

// 序列化
func (m *Mux) recvLoop() {
    for {
        reply := m.srv.Recv()
        tag := m.srv.Tag(reply)

        m.mu.Lock()
        done := m.pending[tag]
        delete(m.pending, tag)
        m.mu.Unlock()

        if done == nil {
            panic("unexpected reply")
        }
        done <- reply
    }
}

func (m *Mux) Call(args Msg) (reply Msg) {
    tag := m.srv.ReadTag(args)
    done := make(chan Msg, 1)

    m.mu.Lock()
    if m.pending[tag] != nil {
        m.mu.Unlock()
        panic("mux: duplicate call tag")
    }
    m.pending[tag] = done
    m.mu.Unlock()

    m.send <- args
    return <-done
}