GO语言实战之并发和 goroutine

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》

写在前面


  • 嗯,学习GO,所以有了这篇文章
  • 博文内容为《GO语言实战》读书笔记之一
  • 作者写的太好了,所以全章摘录
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


内容摘要:

  • 并发是指 goroutine 运行的时候是相互独立的。
  • 使用关键字 go 创建 goroutine 来运行函数。
  • goroutine 在逻辑处理器上执行,而逻辑处理器具有独立的系统线程和运行队列
  • 竞争状态是指两个或者多个 goroutine 试图访问同一个资源。
  • 原子函数互斥锁提供了一种防止出现竞争状态的办法。
  • 通道提供了一种在两个 goroutine 之间共享数据的简单方法。
  • 无缓冲的通道保证同时交换数据,而有缓冲的通道不做这种保证。

并发

编码中,并行执行多个任务会有更大的好处。一个例子是:

Web 服务需要在各自独立的套接字(socket)上同时接收多个数据请求。每个套接字请求都是独立的,可以完全独立于其他套接字进行处理。具有并行执行多个请求的能力可以显著提高这类系统的性能。

考虑到这一点,Go 语言的语法和运行时直接内置了对并发的支持。

Go 语言里的并发指的是能让某个函数独立于其他函数运行的能力。当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行

Go 语言 运行时的调度器是一个复杂的软件,能管理被创建的所有 goroutine 并为其分配执行时间。这个调度器在操作系统之上,将操作系统的线程与语言运行时的逻辑处理器绑定,并在逻辑处理器上运行 goroutine

调度器在任何给定的时间,都会全面控制哪个 goroutine 要在哪个逻辑处理器上运行

Go 语言的并发同步模型来自一个叫作通信顺序进程(Communicating Sequential Processes,CSP)的范型(paradigm)。

CSP 是一种消息传递模型,通过在 goroutine 之间传递数据传递消息,而不是对数据进行加锁来实现同步访问。用于在 goroutine 之间同步和传递数据的关键数据类型叫作通道(channel)

使用通道可以使编写并发程序更容易,也能够让并发程序出错更少

并发与并行

操作系统会在物理处理器上调度线程来运行,而 Go 语言的运行时会在逻辑处理器上调度 goroutine 来运行。每个逻辑处理器都分别绑定到单个操作系统线程

1.5 版本 Go语言的运行时默认会为每个可用的物理处理器分配一个逻辑处理器

1.5 版本之前的版本中,默认给整个应用程序只分配一个逻辑处理器。这些逻辑处理器会用于执行所有被创建的goroutine。即便只有一个逻辑处理器,Go也可以以神奇的效率和性能,并发调度无数个goroutine。

 6-2

在图 6-2 中,可以看到操作系统线程、逻辑处理器和本地运行队列之间的关系。

如果创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调度器的全局运行队列中。之后,调度器就将这些队列中的 goroutine 分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。本地运行队列中的 goroutine 会一直等待直到自己被分配的逻辑处理器执行.

正在运行的 goroutine 需要执行一个阻塞的系统调用,如打开一个文件。当这类调用发生时,线程和 goroutine 会从逻辑处理器上分离,该线程会继续阻塞,等待系统调用的返回。

与此同时,这个逻辑处理器就失去了用来运行的线程。所以,调度器会创建一个新线程,并将其绑定到该逻辑处理器上。之后,调度器会从本地运行队列里选择另一个 goroutine 来运行。

一旦被阻塞的系统调用执行完成并返回,对应的 goroutine 会放回到本地运行队列,而之前的线程会保存好,以便之后可以继续使用。

在很多情况下,并发的效果比并行好,因为操作系统和硬件的总资源一般很少,但能支持系统同时做很多事情。这种“使用较少的资源做更多的事情”的哲学,也是指导 Go 语言设计的哲学

如果希望让 goroutine 并行,必须使用多于一个逻辑处理器。当有多个逻辑处理器时,调度器会将 goroutine 平等分配到每个逻辑处理器上。这会让 goroutine 在不同的线程上运行。

不过要想真的实现并行的效果,用户需要让自己的程序运行在有多个物理处理器的机器上。否则,哪怕 Go 语言运行时使用多个线程,goroutine 依然会在同一个物理处理器上并发运行,达不到并行的效果

在这里插入图片描述

goroutine

创建两个 goroutine,以并发的形式分别显示大写和小写的英文字母

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"runtime"
"sync"
)
// main 是所有 Go 程序的入口
func main() {
// 分配一个逻辑处理器给调度器使用
runtime.GOMAXPROCS(1)
// wg 用来等待程序完成
// 计数加 2,表示要等待两个 goroutine
var wg sync.WaitGroup
wg.Add(2)

fmt.Println("Start Goroutines")
// 声明一个匿名函数,并创建一个 goroutine
go func() {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()

// 显示字母表 3 次
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()

// 声明一个匿名函数,并创建一个 goroutine
go func() {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()

// 显示字母表 3 次
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
// 等待 goroutine 结束
fmt.Println("等待 goroutine 结束")
wg.Wait()
fmt.Println("\nTerminating Program")
}

调用了 runtime 包的 GOMAXPROCS 函数。这个函数允许程序更改调度器可以使用的逻辑处理器的数量。如果不想在代码里做这个调用,也可以通过修改和这 个函数名字一样的环境变量的值来更改逻辑处理器的数量。给这个函数传入 1,是通知调度器只能为该程序使用一个逻辑处理器

两个函数分别通过关键字 go 创建 goroutine 来执行

1
2
3
4
Start Goroutines
等待 goroutine 结束
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
Terminating Program

第一个 goroutine 完成所有显示需要花时间太短了,以至于在调度器切换到第二个 goroutine 之前,就完成了所有任务。这也是为什么会看到先输出了所有的大写字母

main 函数通过 WaitGroup,等待两个 goroutine 完成它们的工作, WaitGroup 是一个计数信号量,可以用来记录并维护运行的 goroutine

如果 WaitGroup 的值大于 0,Wait 方法就会阻塞。创建了一个 WaitGroup 类型的变量,之后在将这个 WaitGroup 的值设置为 2,表示有两个正在运行的 goroutine。

为了减小 WaitGroup 的值并最终释放 main 函数,使用 defer 声明在函数退出时调用 Done 方法, 关键字 defer 会修改函数调用时机,在正在执行的函数返回时才真正调用 defer 声明的函数。

在 Go 语言中,sync.WaitGroup 用于协调并发任务的完成``。WaitGroup 提供了三个方法:Add()、Done() 和 Wait()

  • 当你启动一个 Goroutine 时,可以调用 Add(1) 来增加等待的 Goroutine 数量。
  • 在 Goroutine 完成时,通过调用 Done() 来减少等待的 Goroutine 数量。
  • 在主 Goroutine 中,可以调用 Wait() 来阻塞,直到所有的 Goroutine 完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup

// 启动两个 Goroutine
wg.Add(2)
go worker(1, &wg)
go worker(2, &wg)

// 等待所有 Goroutine 完成
wg.Wait()

fmt.Println("所有 Goroutine 已完成")
}

func worker(id int, wg *sync.WaitGroup) {
// 在函数结束时通知 WaitGroup 完成
defer wg.Done()

fmt.Printf("Worker %d 正在执行\n", id)
}

基于调度器的内部算法,一个正运行的 goroutine 在工作结束前,可以被停止并重新调度

调度器这样做的目的是防止某个 goroutine 长时间占用逻辑处理器。当 goroutine 占用时间过长时,调度器会停止当前正运行的 goroutine,并给其他可运行的 goroutine 运行的机会。

在这里插入图片描述

以给每个可用的物理处理器在运行的时候分配一个逻辑处理器 runtime.GOMAXPROCS(runtime.NumCPU())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"runtime"
"sync"
)

// 创建一个协调器
var wg sync.WaitGroup

// main is the entry point for all Go programs.
func main() {
// 分配一个逻辑处理器处理
runtime.GOMAXPROCS(runtime.NumCPU())

// 有两个 goroutine 要运行等待
wg.Add(2)

// 创建两个 goroutine
fmt.Println("Create Goroutines")
go printPrime("A")
go printPrime("B")

// 等待 goroutine 结束
fmt.Println("Waiting To Finish")
wg.Wait()

fmt.Println("Terminating Program")
}

// printPrime 显示 5000 以内的素数值
func printPrime(prefix string) {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()

next:
for outer := 2; outer < 5000; outer++ {
for inner := 2; inner < outer; inner++ {
if outer%inner == 0 {
continue next
}
}
//time.Sleep(1 * time.Second) // 睡眠2秒
fmt.Printf("%s:%d\n", prefix, outer)
}
fmt.Println("Completed", prefix)
}

需要强调的是,使用多个逻辑处理器并不意味着性能更好。在修改任何语言运行时配置参数的时候,都需要配合基准测试来评估程序的运行效果

只有在有多个逻辑处理器且可以同时让每个 goroutine 运行在一个可用的物理处理器上的时候,goroutine 才会并行运行

竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)

对一个共享资源的读和写操作必须是原子化的,换句话说,同一时刻只能有一个 goroutine 对共享资源进行读和写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"fmt"
"runtime"
"sync"
)

var (
// counter 是所有 goroutine 都要增加其值的变量
counter int
wg sync.WaitGroup
)

func main() {
wg.Add(2)
go incCounter(1)
go incCounter(2)
wg.Wait()
fmt.Println("Final Counter:", counter)
}

// incCounter 增加包里 counter 变量的值
func incCounter(id int) {
defer wg.Done()

for count := 0; count < 2; count++ {
value := counter
// 当前 goroutine 从线程退出,并放回到队列
runtime.Gosched()
value++
// 将该值保存回 counter
counter = value
}
}

每个 goroutine 都会覆盖另一个 goroutine 的工作。这种覆盖发生在 goroutine 切换的时候。每个 goroutine 创造了一个 counter 变量的副本,之后就切换到另一个 goroutine。当这个 goroutine 再次运行的时候,counter 变量的值已经改变了,但是 goroutine 并没有更新自己的那个副本的值,而是继续使用这个副本的值,用这个值递增,并存回 counter 变量,结果覆盖了另一个 goroutine 完成的工作。

在这里插入图片描述

Go调度器会在 goroutine 发生阻塞、IO操作、函数调用等情况下自动进行调度切换,而无需显式调用 runtime.Gosched()

Go 语言有一个特别的工具,可以在代码里检测竞争状态。在查找这类错误的时候,这个工具非常好用,尤其是在竞争状态并不像这个例子里这么明显的时候。让我们用这个竞争检测器来检测一下我们的例子代码,

1
go build -race // 用竞争检测器标志来编译程序
1
2
3
4
counter = value
value := counter
go incCounter(1)
go incCounter(2)

这几行代码分别是对 counter 变量的读和写操作

一种修正代码、消除竞争状态的办法是,使用Go 语言提供的锁机制,来锁住共享资源,从而保证 goroutine 的同步状态

锁住共享资源

Go 语言提供了传统的同步 goroutine 的机制,就是对共享资源加锁。如果需要顺序访问一个,整型变量或者一段代码,atomic 和 sync包里的函数提供了很好的解决方案。 atomic 包里的几个函数以及 sync 包里的 mutex 类型。

原子函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 这个示例程序展示如何使用 atomic 包来提供
package main
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)
var (
// counter 是所有 goroutine 都要增加其值的变量
counter int64
// wg 用来等待程序结束
wg sync.WaitGroup
)
// main 是所有 Go 程序的入口
func main() {
wg.Add(2)
go incCounter(1)
go incCounter(2)
wg.Wait()
fmt.Println("Final Counter:", counter)
}

func incCounter(id int) {
defer wg.Done()
for count := 0; count < 2; count++ {
// 安全地对 counter 加 1
atomic.AddInt64(&counter, 1)

// 当前 goroutine 从线程退出,并放回到队列
runtime.Gosched()
}
}

使用了 atmoic 包的 AddInt64 函数。这个函数会同步整型值的加法,方法是强制同一时刻只能有一个 goroutine 运行并完成这个加法操作

goroutine 试图去调用任何原子函数时,这些 goroutine 都会自动根据所引用的变量做同步处理。

另外两个有用的原子函数是 LoadInt64StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var (
shutdown int64
wg sync.WaitGroup
)

func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1 * time.Second)
fmt.Println("Shutdown Now")
// 该停止工作了,安全地设置 shutdown 标志
atomic.StoreInt64(&shutdown, 1)
wg.Wait()
}

func doWork(name string) {
// Schedule the call to Done to tell main we are done.
defer wg.Done()

for {
fmt.Printf("Doing %s Work\n", name)
//给定 goroutine 执行的时间
time.Sleep(250 * time.Millisecond)

// 读取工作标识,判断当前工作是否完成,要停止工作了吗?
if atomic.LoadInt64(&shutdown) == 1 {
fmt.Printf("Shutting %s Down\n", name)
break
}
}
}

goroutine 会使用 LoadInt64 来检查 shutdown 变量的值。这个函数会安全地返回 shutdown 变量的一个副本。如果这个副本的值为 1,goroutine 就会跳出循环并终止。

main 函数使用 StoreInt64 函数来安全地修改 shutdown 变量的值。如果哪个 doWork goroutine 试图在 main 函数调用 StoreInt64 的同时调用 LoadInt64 函数,那么原子函数会将这些调用互相同步,保证这些操作都是安全的,不会进入竞争状态

类似 Java 的原子基本类型,这里可以结合理解,AtomicBoolean,AtomicInteger,AtomicLong

互斥锁

另一种同步访问共享资源的方式是使用互斥锁(mutex)互斥锁这个名字来自互斥(mutualexclusion)的概念。互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以执行这个临界区代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"runtime"
"sync"
)

var (
counter int
wg sync.WaitGroup
// mutex 用来定义一段代码临界区
mutex sync.Mutex
)

func main() {
wg.Add(2)

go incCounter(1)
go incCounter(2)

// Wait for the goroutines to finish.
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}

// incCounter 使用互斥锁来同步并保证安全访问,
// 增加包里 counter 变量的值
func incCounter(id int) {
defer wg.Done()

for count := 0; count < 2; count++ {
mutex.Lock()
{
value := counter
runtime.Gosched()
value++
counter = value
}
mutex.Unlock()
}
}

Lock()和 Unlock() 函数调用定义的临界区里被保护起来。使用大括号只是为了让临界区看起来更清晰,并不是必需的。同一时刻只有一个 goroutine 可以进入临界区。之后,直到调用 Unlock()函数之后,其他 goroutine 才能进入临界区。 可以结合 Java 的 ReentrantLock 来理解。都属于互斥的可重入锁.

通道

在 Go 语言里,你不仅可以使用原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针

在 Go 语言中需要使用内置函数 make 来创建一个通道

1
2
3
4
5
// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)

make 的第一个参数需要是关键字 chan,之后跟着允许通道交换的数据的类型。如果创建的是一个有缓冲的通道,之后还需要在第二个参数指定这个通道的缓冲区的大小。

向通道发送值或者指针需要用到<-操作符

1
2
3
4
5
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串
buffered <- "Gopher"

创建了一个有缓冲的通道,数据类型是字符串,包含一个 10 个值的缓冲区。之后我们通过通道发送字符串”Gopher”。

为了让另一个 goroutine 可以从该通道里接收到这个字符串,我们依旧使用<-操作符,但这次是一元运算符,当从通道里接收一个值或者指针时,<-运算符在要操作的通道变量的左侧

1
2
// 从通道接收一个字符串
value := <- buffered

通道是否带有缓冲,其行为会有一些不同。理解这个差异对决定到底应该使用还是不使用缓冲很有帮助。下面我们分别介绍一下这两种类型。

无缓冲的通道

无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。

这种类型的通道要求发送 goroutine接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

在这里插入图片描述

在网球比赛中,两位选手会把球在两个人之间来回传递。选手总是处在以下两种状态之一:要么在等待接球,要么将球打向对方。可以使用两个 goroutine 来模拟网球比赛,并使用无缓冲的通道来模拟球的来回.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

var wg sync.WaitGroup

func init() {
rand.Seed(time.Now().UnixNano())
}

// main is the entry point for all Go programs.
func main() {
// 创建一个无缓冲的通道
court := make(chan int)

// Add a count of two, one for each goroutine.
wg.Add(2)

// 启动两个选手
go player("Nadal", court)
go player("Djokovic", court)

// 发球
court <- 1

// Wait for the game to finish.
wg.Wait()
}

// player 模拟一个选手在打网球
func player(name string, court chan int) {
// 在函数退出时调用 Done 来通知 main 函数工作已经完成
defer wg.Done()

for {
// 等待球被击打过来
ball, ok := <-court
if !ok {
// 如果通道被关闭,我们就赢了
fmt.Printf("Player %s Won\n", name)
return
}

// 选随机数,然后用这个数来判断我们是否丢球
n := rand.Intn(100)
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)

// close()是一个内建函数,用于关闭一个通道
close(court)
return
}

// / 显示击球数,并将击球数加 1
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++

// / 将球打向对手
court <- ball
}
}

==========
Player Djokovic Hit 1
Player Nadal Hit 2
Player Djokovic Hit 3
Player Nadal Hit 4
Player Djokovic Hit 5
Player Nadal Hit 6
Player Djokovic Hit 7
Player Nadal Hit 8
Player Djokovic Missed
Player Nadal Won

Process finished with the exit code 0

4 个跑步者围绕赛道轮流跑,第二个、第三个和第四个跑步者要接到前一位跑步者的接力棒后才能起跑。比赛中最重要的部分是要传递接力棒,要求同步传递。在同步接力棒的时候,参与接力的两个跑步者必须在同一时刻准备好交接.

非常不错的一个 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 4 个 goroutine 间的接力比赛
package main

import (
"fmt"
"sync"
"time"
)

// wg 用来等待程序结束
var wg sync.WaitGroup

// / main 是所有 Go 程序的入口
func main() {
// 创建一个无缓冲的通道
baton := make(chan int)

// 为最后一位跑步者将计数加 1
wg.Add(1)

// 表示第一位跑步者来到跑道
go Runner(baton)

// 开始比赛,为第一位跑步者提供接力棒
baton <- 1

// 等待接力结束
wg.Wait()
}

// Runner 模拟接力比赛中的一位跑步者
func Runner(baton chan int) {
var newRunner int

// 等待接力棒
runner := <-baton

// 开始绕着跑道跑步
fmt.Printf("当前接力者 %d 开始跑步\n", runner)

// 创建下一位跑步者,这里是一个递归,实际上模拟其他选手到达指定接力位置
if runner != 4 {
newRunner = runner + 1
fmt.Printf("当前选手 %d 开始接力\n", newRunner)
go Runner(baton)
}
// 所有选手到达指定位置

// 围绕跑道跑
time.Sleep(100 * time.Millisecond)

// 当所有人跑完结束比赛
if runner == 4 {
fmt.Printf("最后一位 %d 跑完了, Race Over\n", runner)
wg.Done()
return
}
// 跑完之后交接接力棒

// 将接力棒交给下一位跑步者
fmt.Printf("接力棒由 %d 交给了 %d\n",
runner,
newRunner)
baton <- newRunner
}
=========
C:\Users\liruilong\AppData\Local\JetBrains\GoLand2023.2\tmp\GoLand\___go_build_example_com_m_chapter6_listing22.exe
当前接力者 1 开始跑步
当前选手 2 开始接力
====== 接力棒由 1 交给了 2
当前接力者 2 开始跑步
当前选手 3 开始接力
====== 接力棒由 2 交给了 3
当前接力者 3 开始跑步
当前选手 4 开始接力
====== 接力棒由 3 交给了 4
当前接力者 4 开始跑步
最后一位 4 跑完了, Race Over

Process finished with the exit code 0

这里有个误区,第一眼看这个代码,可能会把 go Runner(baton) 当作一个递归方法来理解,这里使用了, 所以本质上是不同的 goroutine , goroutine 导致当前住进程并不会直接进入被声明函数栈里面。而是会顺序执行。 直到发生了通道操作,然后会进入到其他的 goroutine,也就是说,在第一个人进行跑步时,其他的通道一直时阻塞状态。

有缓冲的通道

有缓冲的通道(buffered channel) 是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。

  • 只有在通道中没有接收的值时,接收动作才会阻塞
  • 只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。

在这里插入图片描述

可以看到有缓存的通道情况下,两个操作既不是同步的,也不会互相阻塞,即通道两侧的读写发生是没有直接关系的。

使用有缓冲的通道的例子,这个例子管理一组 goroutine 来接收并完成工作。有缓冲的通道提供了一种清晰而直观的方式来实现这个功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 有缓冲的通道和固定数目的 goroutine 来处理一堆工作
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

const (
numberGoroutines = 4 // 要使用的 goroutine 的数量
taskLoad = 10 // 要处理的工作的数量
)

// wg 用来等待程序完成
var wg sync.WaitGroup

// init is called to initialize the package by the
// Go runtime prior to any other code being executed.
func init() {
// 初始化随机数种子
rand.Seed(time.Now().Unix())
}

// main 是所有 Go 程序的入口
func main() {
// 创建一个有缓冲的通道来管理工作
tasks := make(chan string, taskLoad)

//使用这么多 goroutine 来处理
wg.Add(numberGoroutines)
for gr := 1; gr <= numberGoroutines; gr++ {
// 启动每个 goroutine 来处理工作
go worker(tasks, gr)
}

// 增加一组要完成的工作,这里一次性放入了所有的通道数据
for post := 1; post <= taskLoad; post++ {
tasks <- fmt.Sprintf("Task : %d", post)
}

// 当所有工作都处理完时关闭通道
// 以便所有 goroutine 退出
close(tasks)

// 等待所有工作完成
wg.Wait()
}

// worker 作为 goroutine 启动来处理
// 从有缓冲的通道传入的工作
func worker(tasks chan string, worker int) {
// 通知函数已经返回
defer wg.Done()

for {
// 等待分配工作
task, ok := <-tasks
if !ok {
// 这意味着通道已经空了,并且已被关闭
fmt.Printf("员工: %d : 结束了工作\n", worker)
return
}
// 分配工作显示我们开始工作了
fmt.Printf("员工: %d : 开始执行任务 %s\n", worker, task)
// 随机等一段时间来模拟工作
sleep := rand.Int63n(100)
time.Sleep(time.Duration(sleep) * time.Millisecond)

// 显示我们完成了工作
fmt.Printf("员工: %d : 完成了任务 %s\n", worker, task)
}
}

这里类似队列一样,生成四个 goroutine 在有缓存通道里面拿到数据处理数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
C:\Users\liruilong\AppData\Local\JetBrains\GoLand2023.2\tmp\GoLand\___go_build_example_com_m_chapter6_listing24.exe
员工: 1 : 开始执行任务 Task : 3
员工: 3 : 开始执行任务 Task : 4
员工: 2 : 开始执行任务 Task : 1
员工: 4 : 开始执行任务 Task : 2
员工: 1 : 完成了任务 Task : 3
员工: 1 : 开始执行任务 Task : 5
员工: 2 : 完成了任务 Task : 1
员工: 2 : 开始执行任务 Task : 6
员工: 2 : 完成了任务 Task : 6
员工: 2 : 开始执行任务 Task : 7
员工: 3 : 完成了任务 Task : 4
员工: 3 : 开始执行任务 Task : 8
员工: 4 : 完成了任务 Task : 2
员工: 4 : 开始执行任务 Task : 9
员工: 1 : 完成了任务 Task : 5
员工: 1 : 开始执行任务 Task : 10
员工: 1 : 完成了任务 Task : 10
员工: 4 : 完成了任务 Task : 9
员工: 4 : 结束了工作
员工: 1 : 结束了工作
员工: 3 : 完成了任务 Task : 8
员工: 3 : 结束了工作
员工: 2 : 完成了任务 Task : 7
员工: 2 : 结束了工作

Process finished with the exit code 0

博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,如果你认可它,不要吝啬星星哦 :)


《GO语言实战》


© 2018-至今 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

发布于

2023-09-19

更新于

2024-11-22

许可协议

评论
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×