Select 语句是 Go 语言中专门用于处理 Channel 操作的控制结构。它可以同时监听多个 Channel 的读写操作,当其中任意一个 Channel 准备好时,就会执行对应的 case 分支。
在实际的并发编程中,我们经常需要同时处理多个 Channel:
如果没有 Select,我们需要为每个 Channel 创建一个 Goroutine,这会让代码变得复杂且难以管理。Select 提供了一种优雅的方式来处理这些场景。
select {
case <-channel1:
// 从 channel1 接收到数据时执行
case data := <-channel2:
// 从 channel2 接收到数据时执行,data 是接收到的值
case channel3 <- value:
// 向 channel3 发送数据成功时执行
default:
// 如果没有任何 Channel 准备好,执行默认操作
}
虽然 Select 和 Switch 看起来很像,但它们有本质区别:
| 特性 | Select | Switch |
|---|---|---|
| 用途 | 专门处理 Channel 操作 | 处理各种类型的条件判断 |
| case 条件 | 必须是 Channel 的发送或接收操作 | 可以是任何类型的值比较 |
| 执行方式 | 随机选择一个就绪的 case | 从上到下匹配第一个符合条件的 case |
| 阻塞特性 | 如果没有 case 就绪会阻塞 | 不会阻塞 |
让我们通过一个完整的例子来理解 Select 的基本用法:
package main
import (
"fmt"
"time"
)
func main() {
// 创建两个 Channel
ch1 := make(chan string)
ch2 := make(chan string)
// 启动两个 Goroutine,分别向两个 Channel 发送数据
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 Channel 1 的消息"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 Channel 2 的消息"
}()
// 使用 Select 监听两个 Channel
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("收到:", msg1)
case msg2 := <-ch2:
fmt.Println("收到:", msg2)
}
}
}
输出结果:
收到: 来自 Channel 1 的消息
收到: 来自 Channel 2 的消息
当多个 Channel 同时准备好时,Select 会随机选择一个执行:
package main
import (
"fmt"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
// 同时向两个 Channel 发送数据
go func() {
ch1 <- "Channel 1"
ch2 <- "Channel 2"
}()
// 多次执行 Select,观察随机选择
for i := 0; i < 10; i++ {
select {
case msg1 := <-ch1:
fmt.Printf("第%d次: 选择了 ch1 - %s\n", i+1, msg1)
case msg2 := <-ch2:
fmt.Printf("第%d次: 选择了 ch2 - %s\n", i+1, msg2)
}
}
}
每次运行的结果可能不同,这体现了 Select 的随机选择特性。
超时处理是 Select 最常用的场景之一。通过结合 time.After 函数,我们可以为 Channel 操作设置超时时间。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
// 模拟一个耗时的操作
go func() {
time.Sleep(3 * time.Second) // 假设这个操作需要 3 秒
ch <- "操作完成"
}()
select {
case result := <-ch:
fmt.Println("收到结果:", result)
case <-time.After(2 * time.Second): // 设置 2 秒超时
fmt.Println("操作超时!")
}
}
输出结果:
操作超时!
package main
import (
"fmt"
"net/http"
"time"
)
func main() {
// 创建一个 Channel 用于接收结果
result := make(chan string)
// 启动 Goroutine 发送 HTTP 请求
go func() {
resp, err := http.Get("https://httpbin.org/delay/3")
if err != nil {
result <- fmt.Sprintf("请求失败: %v", err)
return
}
defer resp.Body.Close()
result <- fmt.Sprintf("请求成功,状态码: %d", resp.StatusCode)
}()
// 等待结果或超时
select {
case res := <-result:
fmt.Println(res)
case <-time.After(2 * time.Second):
fmt.Println("HTTP 请求超时")
}
}
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
timer := time.NewTimer(2 * time.Second)
go func() {
time.Sleep(1 * time.Second)
ch <- "数据已准备好"
}()
select {
case msg := <-ch:
timer.Stop() // 收到数据后停止计时器
fmt.Println("收到:", msg)
case <-timer.C:
fmt.Println("超时了")
}
}
默认情况下,Channel 的发送和接收操作都是阻塞的。使用 Select 配合 default 分支可以实现非阻塞操作。
package main
import "fmt"
func main() {
ch := make(chan string, 1)
// 尝试非阻塞接收
select {
case msg := <-ch:
fmt.Println("收到消息:", msg)
default:
fmt.Println("没有消息可接收")
}
// 发送一条消息
ch <- "Hello"
// 再次尝试非阻塞接收
select {
case msg := <-ch:
fmt.Println("收到消息:", msg)
default:
fmt.Println("没有消息可接收")
}
}
输出结果:
没有消息可接收
收到消息: Hello
package main
import "fmt"
func main() {
ch := make(chan string, 1) // 缓冲区大小为 1
// 第一次发送成功
select {
case ch <- "消息1":
fmt.Println("发送消息1成功")
default:
fmt.Println("发送消息1失败,Channel 已满")
}
// 第二次发送失败(缓冲区已满)
select {
case ch <- "消息2":
fmt.Println("发送消息2成功")
default:
fmt.Println("发送消息2失败,Channel 已满")
}
// 接收消息
fmt.Println("接收:", <-ch)
// 再次尝试发送
select {
case ch <- "消息3":
fmt.Println("发送消息3成功")
default:
fmt.Println("发送消息3失败,Channel 已满")
}
}
package main
import (
"fmt"
"time"
)
type MessageQueue struct {
messages chan string
}
func NewMessageQueue(size int) *MessageQueue {
return &MessageQueue{
messages: make(chan string, size),
}
}
// 非阻塞发送
func (mq *MessageQueue) TrySend(msg string) bool {
select {
case mq.messages <- msg:
return true
default:
return false
}
}
// 非阻塞接收
func (mq *MessageQueue) TryReceive() (string, bool) {
select {
case msg := <-mq.messages:
return msg, true
default:
return "", false
}
}
func main() {
mq := NewMessageQueue(3)
// 尝试发送多条消息
messages := []string{"消息A", "消息B", "消息C", "消息D", "消息E"}
for _, msg := range messages {
if mq.TrySend(msg) {
fmt.Printf("发送成功: %s\n", msg)
} else {
fmt.Printf("发送失败: %s (队列已满)\n", msg)
}
}
// 尝试接收消息
fmt.Println("\n开始接收消息:")
for {
if msg, ok := mq.TryReceive(); ok {
fmt.Printf("接收成功: %s\n", msg)
} else {
fmt.Println("队列为空,退出")
break
}
}
}
在实际应用中,我们经常需要在循环中使用 Select 来持续监听多个 Channel。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
stop := make(chan bool)
// 数据生产者 1
go func() {
for i := 0; i < 3; i++ {
time.Sleep(500 * time.Millisecond)
ch1 <- fmt.Sprintf("Ch1-消息%d", i+1)
}
}()
// 数据生产者 2
go func() {
for i := 0; i < 3; i++ {
time.Sleep(700 * time.Millisecond)
ch2 <- fmt.Sprintf("Ch2-消息%d", i+1)
}
}()
// 定时停止
go func() {
time.Sleep(3 * time.Second)
stop <- true
}()
// 持续监听
for {
select {
case msg1 := <-ch1:
fmt.Println("收到 Ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("收到 Ch2:", msg2)
case <-stop:
fmt.Println("收到停止信号,退出循环")
return
}
}
}
package main
import (
"fmt"
"time"
)
func main() {
jobs := make(chan int, 5)
results := make(chan int, 5)
done := make(chan bool)
// 生产者:发送任务
go func() {
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
}()
// 消费者:处理任务
go func() {
for job := range jobs {
time.Sleep(200 * time.Millisecond)
results <- job * 2
}
close(results)
}()
// 结果收集器
go func() {
for result := range results {
fmt.Printf("结果: %d\n", result)
}
done <- true
}()
// 等待完成或超时
select {
case <-done:
fmt.Println("所有任务处理完成")
case <-time.After(3 * time.Second):
fmt.Println("处理超时")
}
}
package main
import (
"fmt"
"time"
)
func main() {
data := make(chan string)
heartbeat := time.NewTicker(500 * time.Millisecond)
defer heartbeat.Stop()
// 模拟数据生产
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Duration(i+1) * time.Second)
data <- fmt.Sprintf("数据%d", i+1)
}
close(data)
}()
// 监听数据和心跳
for {
select {
case msg, ok := <-data:
if !ok {
fmt.Println("数据通道已关闭,退出")
return
}
fmt.Println("收到数据:", msg)
case <-heartbeat.C:
fmt.Println("心跳... (系统正常运行)")
}
}
}
package main
import (
"context"
"fmt"
"os/signal"
"syscall"
"time"
)
func main() {
// 创建上下文,用于优雅退出
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 监听系统信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 工作通道
jobs := make(chan string)
// 启动工作协程
go func() {
for {
select {
case <-ctx.Done():
fmt.Println("工作协程收到退出信号,正在清理...")
time.Sleep(500 * time.Millisecond)
fmt.Println("工作协程已退出")
return
case job := <-jobs:
fmt.Println("处理任务:", job)
}
}
}()
// 模拟发送任务
go func() {
for i := 0; i < 10; i++ {
jobs <- fmt.Sprintf("任务%d", i+1)
time.Sleep(300 * time.Millisecond)
}
}()
// 等待退出信号
select {
case sig := <-sigChan:
fmt.Printf("收到信号: %v,准备退出...\n", sig)
cancel() // 通知所有协程退出
time.Sleep(1 * time.Second) // 等待协程清理
}
}
package main
import (
"fmt"
"math/rand"
"time"
)
func queryDatabase(dbName string, query string, result chan<- string) {
// 模拟数据库查询延迟
delay := time.Duration(rand.Intn(1000)+500) * time.Millisecond
time.Sleep(delay)
result <- fmt.Sprintf("[%s] 查询结果: %s (耗时: %v)", dbName, query, delay)
}
func main() {
rand.Seed(time.Now().UnixNano())
query := "SELECT * FROM users"
result := make(chan string)
// 同时向多个数据库发起查询
databases := []string{"MySQL主库", "MySQL从库", "Redis缓存", "PostgreSQL"}
for _, db := range databases {
go queryDatabase(db, query, result)
}
// 使用 Select 获取最快的结果
select {
case firstResult := <-result:
fmt.Println("最快返回的结果:")
fmt.Println(firstResult)
case <-time.After(2 * time.Second):
fmt.Println("所有查询都超时了")
}
// 注意:其他协程仍在运行,实际应用中应该取消它们
time.Sleep(1 * time.Second)
}
package main
func main() {
select {} // 永久阻塞,程序不会退出
}
这个特性有时用于保持主程序运行,但需要小心使用。
package main
import "fmt"
func main() {
var ch1 chan string // nil Channel
ch2 := make(chan string, 1)
ch2 <- "消息"
select {
case msg := <-ch1: // ch1 是 nil,这个 case 永远不会被选中
fmt.Println("ch1:", msg)
case msg := <-ch2:
fmt.Println("ch2:", msg)
}
}
可以利用这个特性来动态启用/禁用某些 case:
package main
import "fmt"
func main() {
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "来自 ch1"
ch2 <- "来自 ch2"
// 通过设置为 nil 来禁用某个 case
var disabled chan string = nil
select {
case msg := <-ch1:
fmt.Println("ch1:", msg)
case msg := <-disabled: // 这个 case 被禁用
fmt.Println("disabled:", msg)
case msg := <-ch2:
fmt.Println("ch2:", msg)
}
}
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
ch <- "消息1"
ch <- "消息2"
}()
// 只会接收一条消息
select {
case msg := <-ch:
fmt.Println("收到:", msg)
case <-time.After(1 * time.Second):
fmt.Println("超时")
}
// 第二条消息还在 Channel 中
time.Sleep(100 * time.Millisecond)
fmt.Println("剩余消息:", <-ch)
}
如果需要接收所有消息,应该使用循环:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string, 2)
ch <- "消息1"
ch <- "消息2"
close(ch)
// 使用循环接收所有消息
outer:
for {
select {
case msg, ok := <-ch:
if !ok {
fmt.Println("Channel 已关闭")
break outer
}
fmt.Println("收到:", msg)
case <-time.After(1 * time.Second):
fmt.Println("超时")
break outer
}
}
}
// 不好的做法:每次循环都创建新的 Timer
for {
select {
case <-time.After(1 * time.Second): // 每次都创建新 Timer
fmt.Println("超时")
}
}
// 好的做法:复用 Timer
timer := time.NewTimer(1 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
fmt.Println("超时")
timer.Reset(1 * time.Second) // 重置 Timer
}
}
package main
import "fmt"
func main() {
// 无缓冲 Channel:发送方和接收方必须同步
unbuffered := make(chan int)
// 有缓冲 Channel:可以异步发送,提高性能
buffered := make(chan int, 100)
// 使用示例
go func() {
for i := 0; i < 100; i++ {
buffered <- i // 不会阻塞,直到缓冲区满
}
close(buffered)
}()
for i := 0; i < 100; i++ {
fmt.Println(<-buffered)
}
}
package main
import (
"fmt"
"math/rand"
"time"
)
type Task struct {
ID int
Name string
}
type Scheduler struct {
tasks chan Task
results chan string
stop chan bool
maxWorkers int
}
func NewScheduler(maxWorkers int) *Scheduler {
return &Scheduler{
tasks: make(chan Task, 10),
results: make(chan string, 10),
stop: make(chan bool),
maxWorkers: maxWorkers,
}
}
func (s *Scheduler) Start() {
// 启动多个工作协程
for i := 0; i < s.maxWorkers; i++ {
go s.worker(i + 1)
}
// 启动结果收集器
go s.collector()
}
func (s *Scheduler) worker(id int) {
for {
select {
case task := <-s.tasks:
// 模拟任务处理
delay := time.Duration(rand.Intn(500)+100) * time.Millisecond
time.Sleep(delay)
s.results <- fmt.Sprintf("Worker %d 完成任务 %d (%s), 耗时 %v",
id, task.ID, task.Name, delay)
case <-s.stop:
fmt.Printf("Worker %d 正在退出...\n", id)
return
}
}
}
func (s *Scheduler) collector() {
for result := range s.results {
fmt.Println(result)
}
}
func (s *Scheduler) AddTask(task Task) {
s.tasks <- task
}
func (s *Scheduler) Stop() {
close(s.stop)
time.Sleep(500 * time.Millisecond)
close(s.results)
}
func main() {
rand.Seed(time.Now().UnixNano())
scheduler := NewScheduler(3)
scheduler.Start()
// 添加任务
for i := 1; i <= 10; i++ {
scheduler.AddTask(Task{
ID: i,
Name: fmt.Sprintf("任务-%d", i),
})
}
// 等待所有任务完成
time.Sleep(3 * time.Second)
scheduler.Stop()
fmt.Println("调度器已停止")
}
本章详细介绍了 Go 语言中 Select 语句的使用方法:
time.After 可以实现超时控制default 分支可以实现非阻塞的发送和接收Select 是 Go 并发编程的核心工具之一,掌握它能让你写出更加优雅和高效的并发程序。在下一章中,我们将学习更多关于并发同步的知识。