一个特定类型的方法和操作函数是并发安全的,那么所有它的访问方法和操作都是并发安全的。导出包级别的函数一般情况下都是并发安全的,package级的变量没法被限制在单一的goroutine,所以修改这些变量必须使用互斥条件。
竞争条件指的是程序在多个goroutine交叉执行操作时,没有给出正确的结果。只要有两个goroutine并发访问同一个变量,且至少其中的一个是写操作的时候就会发生数据竞争。
避免数据竞争的方法:
>> 方法不要去写变量,此时指只在第一次创建时写入,后续不再对该变量进行修改。
>> 避免从多个goroutine访问变量。go语言中推崇的就是不使用共享数据来通信,使用通信来共享数据。一个提供对指定的变量通过channel来请求的goroutine叫做变量的监控。将变量限定在一个单独的goroutine中,通常是采用channel管道组成的流水线等。
>> 允许多个goroutine区访问变量,但在同一个时刻最多只有一个goroutine在访问,这种方式成为互斥访问。这种就是sync包中提供的一些功能。
sync包里的Mutex类型直接支持互斥,Lock方法能够获取到锁,Unlock方法会释放这个锁。Mutex会包含共享变量。被Mutex保护的变量是在mutex变量声明之后立即声明的。在Lock和Unlock之间的代码段中的内容goroutine可以随便读取或者修改,这个代码段叫做临界区。goroutine在结束后必须释放锁是必要的。即使在错误路径中也要释放。可以使用defer来调用Unlock,临界区会隐式地延伸到函数作用域的最后。go没有重入锁,也即是没法对一个已锁上的mutex再次上锁,这样会导致程序死锁。
在sync包中提供了多读少写场景的多读单写锁sync.RWMutex. RLock和RUnlock方法获取和释放一个去读或者共享锁,而调用Lock和Unlock方法获取和释放一个写或互斥锁。RLock只能在临界区共享变量没有任何写入操作时可用。RWMutex只有当获得锁的大部分goroutine都是读操作,而锁在竞争条件下,goroutine必须等待才能获取到锁的时候,RWMutex才是最能带来好处的。RWMutex需要更复杂的内部记录,因此会比一般的无竞争的锁mutex慢一些。
同步不仅仅是一堆goroutine执行顺序的问题,还会涉及到内存的问题,现代计算机中可能会有多个处理器,每个处理器都有其本地缓存。为了效率队内存的写入一般会在每个处理器中缓冲,并在必要时一起flush到主存。这种情况下数据可能会以与当初goroutine写入顺序不同的顺序提交到主存。channel通信或者互斥量操作这些原语会使处理器将其聚集的写入flush并commit,这样goroutine在某个时间点上的执行结果才能被其他处理器上的goroutine看到。
在一个独立的goroutine中每个语句的执行顺序是可以保证的,也就是说goroutine是顺序连贯的,但是在不使用channel且不使用mutex这样的显示同步操作时,没法保证事件在不同的goroutine中看到的顺序是一致的。如果goroutine在不同的CPU上执行,每个核心都有自己的缓存,一个goroutine的写入对于其他的goroutine的print在主存同步之前是不可见的。所有并发问题都可以用一致的,简单的既定的模式来规避,尽可能将变量限定在goroutine内部,如果是多个goroutine都需要访问的变量,使用互斥条件来访问。
初始化延迟是比较常用的一种处理方式,但是在多协程中如果进行延迟初始化可能会出现互斥问题。sync包提供可一个专门的方案来解决这种一次性初始化的问题:sync.Once,从概念上讲一次性的初始化需要一个互斥量mutex和一个boolean变量来记录初始化是否已经完成。互斥量用来保护boolean变量和客户端数据结构。Do这个唯一的方法需要接收初始化函数作为其参数。这种方法的同步保证了所有的goroutine能够看到给变量的更新。
Go的runtime和工具链提供了一个复杂但很好用的动态分析工具,竞争检查器。只要在go build, go run或者go test 命令后加上-race的flag,就会使编译器创建一个应用的修改不版或者一个附带了能够记录所有运行期对共享变量访问工具的test。并且会记录下每个读或写共享变量的goroutine的身份信息。竞争检查器会报告所有的已经发生的数据竞争,但只能检测到运行时的竞争条件。
channel在关闭后,若其他的goroutine再去操作该channel将不再阻塞,而是直接返回,这样就达到了向其他goroutine广播的功能,通常结合select进行广播的抓取。
通常可以结合sync包和channel一起实现功能强大的并发逻辑,如下实现的一个高性能缓存:
package memo
import (
"sync"
)
type entry struct {
res result
ready chan struct{} //用于广播其他的goroutine该实例初始化完成,可以正式使用
}
type Memo struct {
f Func
mu sync.Mutex
cache map[string]result
}
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
func (memo *Memo) Get(key string) {
memo.mu.Lock() //获取对应的锁
e := memo.cache[key] //获取对应的key对应的值
if e == nil { //如没有获取到,则创建一个新的实例
e = &entry{ready: make(chan struct{})} //保存到cache中,但里面的实际数据还没有进行初始化
memo.cache[key] = e //保存e
memo.mu.Unlock() //是否对应的锁
e.res.value, e.res.err = memo.f(key) //此时调用外部的不可控函数创建对应的信息,可能时间很长,导致长时间挂起
close(e.ready) //关闭该实例中就绪的channel
} else {
memo.mu.Unlock() //释放锁
<-e.ready //读取channel,在关闭情况下不会堵塞,但在不关闭的情况下会阻塞直到该实例初始化完成
}
return e.res.value, e.res.err
}
以上的代码不会反复的进行创建对象,而且保证了并发性和高效性。以上是sync和channel结合的处理方式,实际上可以完全采用channel的形式实现类似的功能。一个缓存服务实际上可以从以下的模式进行开发:缓存服务实际就是一个服务器,服务器分为接收请求部分,数据处理部分,应答发送部分。以上三个部分可以通过三个goroutine分别处理,之间通过channel进行通信。
type Func func(key string)(interface {}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} //用于保证实例可用,关闭ready后再操作不会阻塞
}
type request struct {
key string //请求的key
response chan <-result //发送应答的channel
}
type Memo struct {requests chan request} //接收请求的channel
func New(f Func)*Memo {
memo := &Memo{requests: make(chan request)} //创建一个Memo对象
go memo.server(f) //启动对应的请求goroutine
return memo
}
func (memo *Memo)Get(key string)(interface{}, error){
response := make(chan result) //创建该请求的应答channel
memo.requests <- request{key, response} //创建请求对象,并发送给请求channel
res := <- response //接收该请求的应答管道的数据,会阻塞直到数据准备就绪
return res.value, res.err //返回准备就绪的数据
}
func (memo *Memo)Close(){ //关闭缓存服务的功能
close(memo.requests)
}
//接下来构建整个服务器的三个部分,服务器请求
func (memo *Memo) server(f Func) { //缓存服务的实现,数据由该server统一维护
cache := make(map[string]*entry) //实际的键值对缓存
for req := range memo.requests { //从请求channel中读取请求
e := cache[req.key] //查找是否存在
if e == nil { //若不存在,则创建
e = &entry{ready: make(chan struct{})} //创建好对应的实例
cache[req.key] = e //缓存对应的实例
go e.call(f, req.key) //调用实例自身的函数创建数据,并将对应的ready管道close
}
go e.deliver(req.response) //发送对应的应答
}
}
//数据处理部分,产生对应的消息
func (e *entry) call(F func, key string) {
e.res.value, e.res.err = f(key) //调用函数产生cache
close(e.ready) //关闭准备就绪管道
}
//应答数据的部分
func (e *entry) deliver(response chan <- result) {
<- e.ready //等待该实例准备就绪的信号
response <-e.res //将对应的缓存信息发送到应答管道
}
线程和协程的区别
每个OS线程都有一个固定大小的内存块(2M)来做栈,这个栈会用来存储当前正在被调用或挂起(指在调用其他函数时)的函数的内部变量,2MB的栈对于一个小小的goroutine来说是很大的内存浪费。对于go程序来说创建成百上千个goroutine是非常普遍的。因此每个goroutine的栈空间很小。一个goroutine会以很小的栈开始生命周期,一般只需要2KB,和操作系统线程一样,会保存期获取或挂起的函数调用的本地变量,但和OS线程不一样的是一个goroutine的栈大小并不是固定的,栈的大小会根据需要动态的伸缩,而goroutine的栈大小最大值有1GB,比传统的固定大小的线程栈要大很多,但一般情况下goroutine不需要太大的栈。
OS线程会被操作系统内核调度,一个硬件计时器会中断处理器,会触发内核的调度器进行调度。调度器会检查线程列表决定下一次哪个线程可以被运行,会进行现场保存和恢复等一系列的操作。这种操作涉及到切换,因此操作相对会比较慢。而Go的运行时包含了自己的调度器,这个调度器使用了一些技术手段,如m:n调度,会在n个操作系统线程上多工调度m个goroutine。Go调度器的工作和内核的调度相似,但是这个调度器只关注单独的goroutine。Go调度器并不是用硬件定时器而是被Go语言本身进行调度,也就是应用态的调度。当goroutine调用了time.Sleep或者被channel等阻塞时,调度器会使其进入休眠状态,并开始执行另一个goroutine直到时机到了再去唤醒第一个goroutine。这种调度方式并不需要进入内核的上下文,所以重新调度一个goroutine比调度一个线程代价要低很多。
Go的调度器使用了一个叫做GOMAXPROCS的变量来决定会有多少个操作系统的线程同时执行Go的代码。默认值是运行机器上的CPU的核心数。在I/O或者系统调用非Go语言函数时,是需要一个对应的操作系统线程的,但GOMAXPROCS并不需要将这些情况计数在内。可以用GOMAXPROCS的环境变量显示控制这个参数。也可以在运行时调用runtime.GOMAXPROCS函数来修改。
操作系统的线程都有一个独特的身份ID。这个信息可以以一个普通的形式被容易地获取。而goroutine没有可以被程序员获取的身份ID概念。
阅读(4668) | 评论(0) | 转发(0) |