编辑
2025-02-27
技术学习
00
请注意,本文编写于 59 天前,最后修改于 59 天前,其中某些信息可能已经过时。

目录

golang 并发编程相关
1 WaitGroup
1.1 信号量
1.2 WaitGroup数据结构
1.3 Add(delta int)
1.4 Wait()
1.5 Done()
1.6 其他同步原语与锁
2 context
2.1 实现原理
2.1.1 接口定义
2.2 空context
2.3 cancelCtx
2.3.1 Done()接口实现
2.3.2 Err()接口实现
2.3.3 cancel()接口实现
2.3.4 WithCancel()方法实现
2.4 timerCtx
2.4.1 Deadline()接口实现
2.4.2 cancel()接口实现
2.4.3 WithDeadline()方法实现
2.4.4 WithTimeout()方法实现
2.5 valueCtx
2.5.1 Value()接口实现
2.5.2 WithValue()方法实现
2.6 总结

golang 并发编程相关

Tags: Golang

Published: 2023年10月11日

概要: Golang并发编程相关的博客,介绍了WaitGroup、Context和同步原语与锁的使用。

1 WaitGroup

1.1 信号量

信号量是Unix系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源。

可简单理解为信号量为一个数值:

  • 当信号量>0时,表示资源可用,获取信号量时系统自动将信号量减1;
  • 当信号量==0时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒;

1.2 WaitGroup数据结构

go
type WaitGroup struct { state1 [3]uint32 }

state1是个长度为3的数组,其中包含了state和一个信号量,而state实际上是两个计数器:

  • counter: 当前还未执行结束的goroutine计数器
  • waiter count: 等待goroutine-group结束的goroutine数量,即有多少个等候者
  • semaphore: 信号量

Untitled.png

WaitGroup对外提供三个接口:

  • Add(delta int): 将delta值加到counter中
  • Wait(): waiter递增1,并阻塞等待信号量semaphore
  • Done(): counter递减1,按照waiter数值释放相应次数信号量

1.3 Add(delta int)

Add()做了两件事,一是把delta值累加到counter中,因为delta可以为负值,也就是说counter有可能变成0或负值,所以第二件事就是当counter值变为0时,根据waiter数值释放等量的信号量,把等待的goroutine全部唤醒,如果counter变为负值,则panic.

go
func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() //获取state和semaphore地址指针 state := atomic.AddUint64(statep, uint64(delta)<<32) //把delta左移32位累加到state,即累加到counter中 v := int32(state >> 32) //获取counter值 w := uint32(state) //获取waiter值 if v < 0 { //经过累加后counter值变为负值,panic panic("sync: negative WaitGroup counter") } //经过累加后,此时,counter >= 0 //如果counter为正,说明不需要释放信号量,直接退出 //如果waiter为零,说明没有等待者,也不需要释放信号量,直接退出 if v > 0 || w == 0 { return } //此时,counter一定等于0,而waiter一定大于0(内部维护waiter,不会出现小于0的情况), //先把counter置为0,再释放waiter个数的信号量 *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false) //释放信号量,执行一次释放一个,唤醒一个等待者 } }

1.4 Wait()

Wait()方法也做了两件事,一是累加waiter, 二是阻塞等待信号量

go
func (wg *WaitGroup) Wait() { statep, semap := wg.state() //获取state和semaphore地址指针 for { state := atomic.LoadUint64(statep) //获取state值 v := int32(state >> 32) //获取counter值 w := uint32(state) //获取waiter值 if v == 0 { //如果counter值为0,说明所有goroutine都退出了,不需要待待,直接返回 return } // 使用CAS(比较交换算法)累加waiter,累加可能会失败,失败后通过for loop下次重试 if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(semap) //累加成功后,等待信号量唤醒自己 return } } }

1.5 Done()

Done()只做一件事,即把counter减1,我们知道Add()可以接受负值,所以Done实际上只是调用了Add(-1)。

func (wg *WaitGroup) Done() { wg.Add(-1) }

Done()的执行逻辑就转到了Add(),实际上也正是最后一个完成的goroutine把等待者唤醒的。

1.6 其他同步原语与锁

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/

https://www.topgoer.cn/docs/gozhuanjia/gozhuanjiamutex

2 context

Golang context是Golang应用开发常用的并发控制技术,它与WaitGroup最大的不同点是context对于派生goroutine有更强的控制力,它可以控制多级的goroutine。

context翻译成中文是”上下文”,即它可以控制一组呈树状结构的goroutine,每个goroutine拥有相同的上下文。

典型的使用场景如下图所示:

Untitled 1.png

上图中由于goroutine派生出子goroutine,而子goroutine又继续派生新的goroutine,这种情况下使用WaitGroup就不太容易,因为子goroutine个数不容易确定。而使用context就可以很容易实现。

2.1 实现原理

context实际上只定义了接口,凡是实现该接口的类都可称为是一种context,官方包中实现了几个常用的context,分别可用于不同的场景。

2.1.1 接口定义

源码包中src/context/context.go:Context 定义了该接口:

go
type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }

基础的context接口只定义了4个方法,下面分别简要说明一下:

Deadline()

该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline,则ok == false,此时deadline为一个初始值的time.Time值

Done()

该方法返回一个channel,需要在select-case语句中使用,如”case <-context.Done():”。

当context关闭后,Done()返回一个被关闭的管道,关闭的管道仍然是可读的,据此goroutine可以收到关闭请求;
当context还未关闭时,Done()返回nil。

Err()

该方法描述context关闭的原因。关闭原因由context实现控制,不需要用户设置。比如Deadline context,关闭原因可能是因为deadline,也可能提前被主动关闭,那么关闭原因就会不同:

  • 因deadline关闭:“context deadline exceeded”;
  • 因主动关闭: “context canceled”。

当context关闭后,Err()返回context的关闭原因;
当context还未关闭时,Err()返回nil;

Value()

有一种context,它不是用于控制呈树状分布的goroutine,而是用于在树状分布的goroutine间传递信息。

Value()方法就是用于此种类型的context,该方法根据key值查询map中的value。具体使用后面示例说明。

2.2 空context

context包中定义了一个空的context, 名为emptyCtx,用于context的根节点,空的context只是简单的实现了Context,本身不包含任何值,仅用于其他context的父节点。

emptyCtx类型定义如下代码所示:

go
type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key interface{}) interface{} { return nil }

context包中定义了一个公用的emptCtx全局变量,名为background,可以使用context.Background()获取它,实现代码如下所示:

go
var background = new(emptyCtx) func Background() Context { return background }

context包提供了4个方法创建不同类型的context,使用这四个方法时如果没有父context,都需要传入backgroud,即backgroud作为其父节点:

  • WithCancel()
  • WithDeadline()
  • WithTimeout()
  • WithValue()

context包中实现Context接口的struct,除了emptyCtx外,还有cancelCtx、timerCtx和valueCtx三种,正是基于这三种context实例,实现了上述4种类型的context。

context包中各context类型之间的关系,如下图所示:

Untitled 2.png

struct cancelCtx、timerCtx、valueCtx都继承于Context,下面分别介绍这三个struct。

2.3 cancelCtx

源码包中src/context/context.go:cancelCtx 定义了该类型context:

go
type cancelCtx struct { Context mu sync.Mutex // protects following fields done chan struct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call }

children中记录了由此context派生的所有child,此context被cancel时会把其中的所有child都cancel掉。

cancelCtx与deadline和value无关,所以只需要实现Done()和Err()外露接口即可。

2.3.1 Done()接口实现

按照Context定义,Done()接口只需要返回一个channel即可,对于cancelCtx来说只需要返回成员变量done即可。

这里直接看下源码,非常简单:

go
func (c *cancelCtx) Done() <-chan struct{} { c.mu.Lock() if c.done == nil { c.done = make(chan struct{}) } d := c.done c.mu.Unlock() return d }

由于cancelCtx没有指定初始化函数,所以cancelCtx.done可能还未分配,所以需要考虑初始化。
cancelCtx.done会在context被cancel时关闭,所以cancelCtx.done的值一般经历如下三个阶段:
nil –> chan struct{} –> closed chan。

2.3.2 Err()接口实现

按照Context定义,Err()只需要返回一个error告知context被关闭的原因。对于cancelCtx来说只需要返回成员变量err即可。

还是直接看下源码:

go
func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err c.mu.Unlock() return err }

cancelCtx.err默认是nil,在context被cancel时指定一个error变量: var Canceled = errors.New("context canceled")

2.3.3 cancel()接口实现

cancel()内部方法是理解cancelCtx的最关键的方法,其作用是关闭自己和其后代,其后代存储在cancelCtx.children的map中,其中key值即后代对象,value值并没有意义,这里使用map只是为了方便查询而已。

cancel方法实现伪代码如下所示:

go
func (c *cancelCtx) cancel(removeFromParent bool, err error) { c.mu.Lock() c.err = err //设置一个error,说明关闭原因 close(c.done) //将channel关闭,以此通知派生的context for child := range c.children { //遍历所有children,逐个调用cancel方法 child.cancel(false, err) } c.children = nil c.mu.Unlock() if removeFromParent { //正常情况下,需要将自己从parent删除 removeChild(c.Context, c) } }

实际上,WithCancel()返回的第二个用于cancel context的方法正是此cancel()。

2.3.4 WithCancel()方法实现

WithCancel()方法作了三件事:

  • 初始化一个cancelCtx实例
  • 将cancelCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
  • 返回cancelCtx实例和cancel()方法

其实现源码如下所示:

go
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) //将自身添加到父节点 return &c, func() { c.cancel(true, Canceled) } }

这里将自身添加到父节点的过程有必要简单说明一下:

  1. 如果父节点也支持cancel,也就是说其父节点肯定有children成员,那么把新context添加到children里即可;
  2. 如果父节点不支持cancel,就继续向上查询,直到找到一个支持cancel的节点,把新context添加到children里;
  3. 如果所有的父节点均不支持cancel,则启动一个协程等待父节点结束,然后再把当前context结束。

2.4 timerCtx

源码包中src/context/context.go:timerCtx 定义了该类型context:

go
type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time }

timerCtx在cancelCtx基础上增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器。

由此,衍生出WithDeadline()和WithTimeout()。实现上这两种类型实现原理一样,只不过使用语境不一样:

  • deadline: 指定最后期限,比如context将2018.10.20 00:00:00之时自动结束
  • timeout: 指定最长存活时间,比如context将在30s后结束。

对于接口来说,timerCtx在cancelCtx基础上还需要实现Deadline()和cancel()方法,其中cancel()方法是重写的。

2.4.1 Deadline()接口实现

Deadline()方法仅仅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法设置的。

2.4.2 cancel()接口实现

cancel()方法基本继承cancelCtx,只需要额外把timer关闭。

timerCtx被关闭后,timerCtx.cancelCtx.err将会存储关闭原因:

  • 如果deadline到来之前手动关闭,则关闭原因与cancelCtx显示一致;
  • 如果deadline到来时自动关闭,则原因为:”context deadline exceeded”

2.4.3 WithDeadline()方法实现

WithDeadline()方法实现步骤如下:

  • 初始化一个timerCtx实例
  • 将timerCtx实例添加到其父节点的children中(如果父节点也可以被cancel的话)
  • 启动定时器,定时器到期后会自动cancel本context
  • 返回timerCtx实例和cancel()方法

也就是说,timerCtx类型的context不仅支持手动cancel,也会在定时器到来后自动cancel。

2.4.4 WithTimeout()方法实现

WithTimeout()实际调用了WithDeadline,二者实现原理一致。

看代码会非常清晰:

go
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }

2.5 valueCtx

源码包中src/context/context.go:valueCtx 定义了该类型context:

go
type valueCtx struct { Context key, val interface{} }

valueCtx只是在Context基础上增加了一个key-value对,用于在各级协程间传递一些数据。

由于valueCtx既不需要cancel,也不需要deadline,那么只需要实现Value()接口即可。

2.5.1 Value()接口实现

由valueCtx数据结构定义可见,valueCtx.key和valueCtx.val分别代表其key和value值。 实现也很简单:

go
func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) }

这里有个细节需要关注一下,即当前context查找不到key时,会向父节点查找,如果查询不到则最终返回interface{}。也就是说,可以通过子context查询到父的value值。

2.5.2 WithValue()方法实现

WithValue()实现也是非常的简单, 伪代码如下:

go
func WithValue(parent Context, key, val interface{}) Context { if key == nil { panic("nil key") } return &valueCtx{parent, key, val} }

2.6 总结

  • Context仅仅是一个接口定义,根据实现的不同,可以衍生出不同的context类型;
  • cancelCtx实现了Context接口,通过WithCancel()创建cancelCtx实例;
  • timerCtx实现了Context接口,通过WithDeadline()和WithTimeout()创建timerCtx实例;
  • valueCtx实现了Context接口,通过WithValue()创建valueCtx实例;
  • 三种context实例可互为父节点,从而可以组合成不同的应用形式;

本文作者:AstralDex

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!