Tags: Golang
Published: 2023年10月11日
概要: Golang并发编程相关的博客,介绍了WaitGroup、Context和同步原语与锁的使用。
信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。
可简单理解为信号量为一个数值:
gotype WaitGroup struct {
state1 [3]uint32
}
state1是个长度为3的数组,其中包含了state和一个信号量,而state实际上是两个计数器:
WaitGroup对外提供三个接口:
Add()做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或负值,所以第二件事就是当counter值变为0时,根据waiter数值释放等量的信号量,把等待的goroutine全部唤醒,如果counter变为负值,则panic.
gofunc (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() //获取state和semaphore地址指针
state := atomic.AddUint64(statep, uint64(delta)<<32) //把delta左移32位累加到state,即累加到counter中
v := int32(state >> 32) //获取counter值
w := uint32(state) //获取waiter值
if v < 0 { //经过累加后counter值变为负值,panic
panic("sync: negative WaitGroup counter")
}
//经过累加后,此时,counter >= 0
//如果counter为正,说明不需要释放信号量,直接退出
//如果waiter为零,说明没有等待者,也不需要释放信号量,直接退出
if v > 0 || w == 0 {
return
}
//此时,counter一定等于0,而waiter一定大于0(内部维护waiter,不会出现小于0的情况),
//先把counter置为0,再释放waiter个数的信号量
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false) //释放信号量,执行一次释放一个,唤醒一个等待者
}
}
Wait()方法也做了两件事,一是累加waiter, 二是阻塞等待信号量
gofunc (wg *WaitGroup) Wait() {
statep, semap := wg.state() //获取state和semaphore地址指针
for {
state := atomic.LoadUint64(statep) //获取state值
v := int32(state >> 32) //获取counter值
w := uint32(state) //获取waiter值
if v == 0 { //如果counter值为0,说明所有goroutine都退出了,不需要待待,直接返回
return
}
// 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop下次重试
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap) //累加成功后,等待信号量唤醒自己
return
}
}
}
Done()只做一件事,即把counter减1,我们知道Add()可以接受负值,所以Done实际上只是调用了Add(-1)。
func (wg *WaitGroup) Done() { wg.Add(-1) }
Done()的执行逻辑就转到了Add(),实际上也正是最后一个完成的goroutine把等待者唤醒的。
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/
https://www.topgoer.cn/docs/gozhuanjia/gozhuanjiamutex
Golang context是Golang应用开发常用的并发控制技术,它与WaitGroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine。
context翻译成中文是”上下文”,即它可以控制一组呈树状结构的goroutine,每个goroutine拥有相同的上下文。
典型的使用场景如下图所示:
上图中由于goroutine派生出子goroutine,而子goroutine又继续派生新的goroutine,这种情况下使用WaitGroup就不太容易,因为子goroutine个数不容易确定。而使用context就可以很容易实现。
context实际上只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别可用于不同的场景。
源码包中src/context/context.go:Context
定义了该接口:
gotype Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
基础的context接口只定义了4个方法,下面分别简要说明一下:
Deadline()
该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此时deadline为一个初始值的time.Time值
Done()
该方法返回一个channel,需要在select-case语句中使用,如”case <-context.Done():”。
当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;
当context还未关闭时,Done()返回nil。
Err()
该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如Deadline context,关闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同:
当context关闭后,Err()返回context的关闭原因;
当context还未关闭时,Err()返回nil;
Value()
有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息。
Value()方法就是用于此种类型的context,该方法根据key值查询map中的value。具体使用后面示例说明。
context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现了Context,本身不包含任何值,仅用于其他context的父节点。
emptyCtx类型定义如下代码所示:
gotype emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
context包中定义了一个公用的emptCtx全局变量,名为background,可以使用context.Background()获取它,实现代码如下所示:
govar background = new(emptyCtx)
func Background() Context {
return background
}
context包提供了4个方法创建不同类型的context,使用这四个方法时如果没有父context,都需要传入backgroud,即backgroud作为其父节点:
context包中实现Context接口的struct,除了emptyCtx外,还有cancelCtx、timerCtx和valueCtx三种,正是基于这三种context实例,实现了上述4种类型的context。
context包中各context类型之间的关系,如下图所示:
struct cancelCtx、timerCtx、valueCtx都继承于Context,下面分别介绍这三个struct。
源码包中src/context/context.go:cancelCtx
定义了该类型context:
gotype cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
children中记录了由此context派生的所有child,此context被cancel时会把其中的所有child都cancel掉。
cancelCtx与deadline和value无关,所以只需要实现Done()和Err()外露接口即可。
按照Context定义,Done()接口只需要返回一个channel即可,对于cancelCtx来说只需要返回成员变量done即可。
这里直接看下源码,非常简单:
gofunc (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
由于cancelCtx没有指定初始化函数,所以cancelCtx.done可能还未分配,所以需要考虑初始化。
cancelCtx.done会在context被cancel时关闭,所以cancelCtx.done的值一般经历如下三个阶段:
nil –> chan struct{} –> closed chan。
按照Context定义,Err()只需要返回一个error告知context被关闭的原因。对于cancelCtx来说只需要返回成员变量err即可。
还是直接看下源码:
gofunc (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
cancelCtx.err默认是nil,在context被cancel时指定一个error变量: var Canceled = errors.New("context canceled")
。
cancel()内部方法是理解cancelCtx的最关键的方法,其作用是关闭自己和其后代,其后代存储在cancelCtx.children的map中,其中key值即后代对象,value值并没有意义,这里使用map只是为了方便查询而已。
cancel方法实现伪代码如下所示:
gofunc (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
c.err = err //设置一个error,说明关闭原因
close(c.done) //将channel关闭,以此通知派生的context
for child := range c.children { //遍历所有children,逐个调用cancel方法
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent { //正常情况下,需要将自己从parent删除
removeChild(c.Context, c)
}
}
实际上,WithCancel()返回的第二个用于cancel context的方法正是此cancel()。
WithCancel()方法作了三件事:
其实现源码如下所示:
gofunc WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c) //将自身添加到父节点
return &c, func() { c.cancel(true, Canceled) }
}
这里将自身添加到父节点的过程有必要简单说明一下:
源码包中src/context/context.go:timerCtx
定义了该类型context:
gotype timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
timerCtx在cancelCtx基础上增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器。
由此,衍生出WithDeadline()和WithTimeout()。实现上这两种类型实现原理一样,只不过使用语境不一样:
对于接口来说,timerCtx在cancelCtx基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。
Deadline()方法仅仅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法设置的。
cancel()方法基本继承cancelCtx,只需要额外把timer关闭。
timerCtx被关闭后,timerCtx.cancelCtx.err将会存储关闭原因:
WithDeadline()方法实现步骤如下:
也就是说,timerCtx类型的context不仅支持手动cancel,也会在定时器到来后自动cancel。
WithTimeout()实际调用了WithDeadline,二者实现原理一致。
看代码会非常清晰:
gofunc WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
源码包中src/context/context.go:valueCtx
定义了该类型context:
gotype valueCtx struct {
Context
key, val interface{}
}
valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。
由于valueCtx既不需要cancel,也不需要deadline,那么只需要实现Value()接口即可。
由valueCtx数据结构定义可见,valueCtx.key和valueCtx.val分别代表其key和value值。 实现也很简单:
gofunc (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
这里有个细节需要关注一下,即当前context查找不到key时,会向父节点查找,如果查询不到则最终返回interface{}。也就是说,可以通过子context查询到父的value值。
WithValue()实现也是非常的简单, 伪代码如下:
gofunc WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
return &valueCtx{parent, key, val}
}
本文作者:AstralDex
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!