testing/synctest 包详解
概述
testing/synctest 包提供了支持测试并发代码的功能。它通过在隔离的“气泡“(bubble)中运行测试函数,使得并发代码的测试变得简单可靠。
主要用途:
- 测试并发代码和异步操作
- 隔离测试环境,避免与外部交互
- 使用虚拟时钟控制时间相关的测试
- 等待所有 goroutine 完成
核心概念:
- 气泡(Bubble):隔离的测试环境,包含一个主 goroutine 及其启动的所有 goroutine
- 持久阻塞(Durably Blocked):goroutine 只能被同一气泡内的其他 goroutine 唤醒的阻塞状态
- 虚拟时钟:气泡内的时间是虚拟的,只在所有 goroutine 持久阻塞时才前进
Go 版本要求:Go 1.24+(实验性功能)
包导入
import "testing/synctest"
函数详解(按 A-Z 分层归类)
T
Test
func Test(t *testing.T, f func(*testing.T))
作用:在新的气泡中执行测试函数 f
参数说明:
t:测试上下文f:在气泡中执行的测试函数
特点:
- 等待气泡中的所有 goroutine 退出后才返回
- 如果气泡中的 goroutine 发生死锁,测试将失败
- 不能在气泡内部调用(不能嵌套使用)
- 提供的
*testing.T具有以下特性:T.Cleanup函数在气泡内运行T.Context返回与气泡关联的上下文- 禁止调用
T.Run、T.Parallel和T.Deadline
示例:
func TestAsync(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 在气泡中执行并发测试
done := make(chan bool)
go func() {
// 执行一些异步操作
done <- true
}()
<-done
// 测试通过
})
}
W
Wait
func Wait()
作用:阻塞直到当前气泡中除当前 goroutine 外的所有 goroutine 都处于持久阻塞状态
特点:
- 必须在气泡内调用
- 不能由同一气泡中的多个 goroutine 并发调用
- 当所有 goroutine 持久阻塞时返回
- 如果存在死锁,Test 将 panic
持久阻塞的操作:
- 在气泡内创建的通道上进行阻塞发送或接收
- 阻塞的 select 语句,其中每个案例都是气泡内创建的通道
sync.Cond.Waitsync.WaitGroup.Wait(当Add在气泡内调用时)time.Sleep
非持久阻塞的操作:
- 锁定
sync.Mutex或sync.RWMutex - 阻塞在 I/O 上(如网络套接字读取)
- 系统调用
示例:
func TestWait(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
done := false
go func() {
done = true
}()
// Wait 将阻塞直到 goroutine 完成
synctest.Wait()
t.Log(done) // 总是输出 "true"
})
}
类型详解
testing/synctest 包不导出任何类型,所有功能通过函数提供。
时间控制
虚拟时钟特性
在气泡内,time 包使用虚拟时钟:
- 每个气泡有自己的时钟
- 初始时间为 2000-01-01 UTC 午夜
- 时间只在所有 goroutine 持久阻塞时才前进
- 当气泡的根 goroutine 退出时,时间停止前进
时间示例
func TestTime(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
start := time.Now() // 总是 2000-01-01 00:00:00 UTC
go func() {
time.Sleep(1 * time.Second)
t.Log(time.Since(start)) // 总是输出 "1s"
}()
time.Sleep(2 * time.Second) // 上面的 goroutine 会在 Sleep 返回前运行
t.Log(time.Since(start)) // 总是输出 "2s"
})
}
重要:这个测试会立即完成,而不是花费 2 秒!
隔离特性
通道隔离
在气泡内创建的通道、time.Timer 或 time.Ticker 与气泡关联:
- 从气泡外部操作气泡内的通道会导致 panic
- 从气泡外部操作气泡内的定时器会导致 panic
WaitGroup 关联
sync.WaitGroup 在第一次调用 Add 或 Go 时与气泡关联:
- 一旦关联,从外部调用
Add或Go是致命错误 - 包级变量定义的 WaitGroup(如
var wg sync.WaitGroup)无法与气泡关联 - 存储在包级变量中的 WaitGroup 指针(如
var wg = new(sync.WaitGroup))可以关联
Cond 关联
sync.Cond.Wait 是持久阻塞操作:
- 从气泡外部唤醒气泡内阻塞的
Cond.Wait是致命错误
清理函数和终结器
- 通过
T.Cleanup注册的清理函数在气泡内运行 - 通过
runtime.AddCleanup和runtime.SetFinalizer注册的函数在气泡外运行
典型示例
1. 基本异步测试
package synctest_test
import (
"testing"
"testing/synctest"
)
func TestBasicAsync(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
result := make(chan int)
go func() {
result <- 42
}()
value := <-result
if value != 42 {
t.Errorf("expected 42, got %d", value)
}
})
}
2. 测试 Context.AfterFunc
package synctest_test
import (
"context"
"testing"
"testing/synctest"
)
func TestContextAfterFunc(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 创建可取消的上下文
ctx, cancel := context.WithCancel(t.Context())
// 注册取消时执行的函数
afterFuncCalled := false
context.AfterFunc(ctx, func() {
afterFuncCalled = true
})
// 上下文尚未取消,AfterFunc 不会被调用
synctest.Wait()
if afterFuncCalled {
t.Fatal("before context is canceled: AfterFunc called")
}
// 取消上下文并等待 AfterFunc 执行
cancel()
synctest.Wait()
if !afterFuncCalled {
t.Fatal("after context is canceled: AfterFunc not called")
}
})
}
3. 测试 Context.WithTimeout
package synctest_test
import (
"context"
"testing"
"testing/synctest"
"time"
)
func TestContextWithTimeout(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(t.Context(), timeout)
defer cancel()
// 等待略少于超时时间
time.Sleep(timeout - time.Nanosecond)
synctest.Wait()
if err := ctx.Err(); err != nil {
t.Fatalf("before timeout: ctx.Err() = %v, want nil", err)
}
// 等待剩余时间直到超时
time.Sleep(time.Nanosecond)
synctest.Wait()
if err := ctx.Err(); err != context.DeadlineExceeded {
t.Fatalf("after timeout: ctx.Err() = %v, want DeadlineExceeded", err)
}
})
}
4. 测试多个 Goroutine 同步
package synctest_test
import (
"sync"
"testing"
"testing/synctest"
)
func TestMultipleGoroutines(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var wg sync.WaitGroup
results := make([]int, 3)
for i := 0; i < 3; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
results[idx] = idx * 2
}(i)
}
wg.Wait()
synctest.Wait()
expected := []int{0, 2, 4}
for i, v := range results {
if v != expected[i] {
t.Errorf("results[%d] = %d, want %d", i, v, expected[i])
}
}
})
}
5. 测试通道通信
package synctest_test
import (
"testing"
"testing/synctest"
)
func TestChannelCommunication(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ch := make(chan int)
done := make(chan bool)
// 生产者
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch)
}()
// 消费者
go func() {
sum := 0
for v := range ch {
sum += v
}
t.Log("sum =", sum)
done <- true
}()
synctest.Wait()
<-done
})
}
6. 测试 Select 语句
package synctest_test
import (
"testing"
"testing/synctest"
)
func TestSelectStatement(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ch1 := make(chan int)
ch2 := make(chan string)
result := make(chan string)
go func() {
select {
case v := <-ch1:
result <- "got int: " + string(rune(v))
case v := <-ch2:
result <- "got string: " + v
}
}()
ch2 <- "hello"
synctest.Wait()
msg := <-result
t.Log(msg)
})
}
7. 测试 Time.Sleep 和定时器
package synctest_test
import (
"testing"
"testing/synctest"
"time"
)
func TestSleepAndTicker(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
start := time.Now()
// 多次 Sleep 会快速执行
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
time.Sleep(3 * time.Second)
elapsed := time.Since(start)
if elapsed != 6*time.Second {
t.Errorf("elapsed = %v, want 6s", elapsed)
}
// 测试定时器
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
count := 0
done := make(chan bool)
go func() {
for range ticker.C {
count++
if count >= 3 {
done <- true
return
}
}
}()
synctest.Wait()
<-done
t.Log("ticker fired", count, "times")
})
}
8. 测试 Cond 同步
package synctest_test
import (
"sync"
"testing"
"testing/synctest"
)
func TestCondSync(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
// 等待者
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
t.Log("condition is ready")
mu.Unlock()
}()
// 给予等待者时间进入 Wait
synctest.Wait()
// 通知者
mu.Lock()
ready = true
cond.Broadcast()
mu.Unlock()
synctest.Wait()
})
}
9. 测试 HTTP 100 Continue(高级示例)
package synctest_test
import (
"bufio"
"bytes"
"io"
"net"
"net/http"
"strings"
"testing"
"testing/synctest"
"time"
)
func TestHTTPTransport100Continue(t *testing.T) {
synctest.Test(t, func(*testing.T) {
// 创建进程内假网络连接
srvConn, cliConn := net.Pipe()
defer cliConn.Close()
defer srvConn.Close()
tr := &http.Transport{
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
return cliConn, nil
},
ExpectContinueTimeout: 5 * time.Second,
}
body := "request body"
go func() {
req, _ := http.NewRequest("PUT", "http://test.tld/", strings.NewReader(body))
req.Header.Set("Expect", "100-continue")
resp, err := tr.RoundTrip(req)
if err != nil {
t.Errorf("RoundTrip: unexpected error %v", err)
} else {
resp.Body.Close()
}
}()
// 读取请求头
req, err := http.ReadRequest(bufio.NewReader(srvConn))
if err != nil {
t.Fatalf("ReadRequest: %v", err)
}
// 复制请求体
var gotBody bytes.Buffer
go io.Copy(&gotBody, req.Body)
synctest.Wait()
if got, want := gotBody.String(), ""; got != want {
t.Fatalf("before 100 Continue, read body: %q, want %q", got, want)
}
// 发送 100 Continue 响应
srvConn.Write([]byte("HTTP/1.1 100 Continue\r\n\r\n"))
synctest.Wait()
if got, want := gotBody.String(), body; got != want {
t.Fatalf("after 100 Continue, read body: %q, want %q", got, want)
}
// 发送最终响应
srvConn.Write([]byte("HTTP/1.1 200 OK\r\n\r\n"))
})
}
10. 测试带 Cleanup 的异步操作
package synctest_test
import (
"testing"
"testing/synctest"
)
func TestWithCleanup(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ch := make(chan int)
done := make(chan bool)
// 启动后台 goroutine
go func() {
for {
select {
case v := <-ch:
t.Log("received:", v)
case <-done:
t.Log("cleanup done")
return
}
}
}()
ch <- 1
ch <- 2
ch <- 3
t.Cleanup(func() {
close(done)
})
})
}
最佳实践
1. 使用 Test 包裹所有并发测试
func TestConcurrent(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 所有并发代码都在气泡中运行
})
}
2. 使用 Wait 等待 goroutine 完成
func TestWithWait(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
go func() {
// 执行一些工作
}()
synctest.Wait() // 等待所有 goroutine 阻塞
})
}
3. 利用虚拟时钟加速时间相关测试
func TestTimeout(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
// 测试会立即完成,不需要等待 5 秒
time.Sleep(5 * time.Second)
if ctx.Err() != context.DeadlineExceeded {
t.Error("expected timeout")
}
})
}
4. 避免与外部交互
// 不好的做法
func TestWithNetwork(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 避免真实的网络调用
resp, err := http.Get("http://example.com")
})
}
// 好的做法
func TestWithFakeNetwork(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 使用假的网络连接
srvConn, cliConn := net.Pipe()
})
}
5. 使用包级变量的 WaitGroup 指针
// 不好的做法
var wg sync.WaitGroup // 无法与气泡关联
// 好的做法
var wg = new(sync.WaitGroup) // 可以与气泡关联
6. 避免嵌套 Test 调用
// 错误:不能在气泡内调用 Test
synctest.Test(t, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 这会导致错误
})
})
与其他包配合
testing 包
import (
"testing"
"testing/synctest"
)
func TestExample(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 测试代码
})
}
context 包
import (
"context"
"testing/synctest"
)
func TestContext(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx := t.Context() // 返回与气泡关联的上下文
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
})
}
sync 包
import (
"sync"
"testing/synctest"
)
func TestSync(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// 工作
}()
wg.Wait()
synctest.Wait()
})
}
time 包
import (
"testing/synctest"
"time"
)
func TestTime(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
start := time.Now()
time.Sleep(1 * time.Hour) // 虚拟时间,立即完成
elapsed := time.Since(start)
t.Log("elapsed:", elapsed)
})
}
注意事项
1. 禁止嵌套使用
// 错误示例
synctest.Test(t, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// panic: Test must not be called from within a bubble
})
})
2. Wait 的并发调用限制
// 错误示例
synctest.Test(t, func(t *testing.T) {
go func() {
synctest.Wait() // 错误:并发调用
}()
synctest.Wait() // 错误:并发调用
})
3. 避免网络 I/O
网络 I/O 不是持久阻塞操作,会阻止气泡进入空闲状态:
// 避免这样做
synctest.Test(t, func(t *testing.T) {
conn, _ := net.Dial("tcp", "example.com:80")
conn.Read(buf) // 不会持久阻塞
})
4. Mutex 不是持久阻塞
// Mutex 锁定不是持久阻塞
var mu sync.Mutex
mu.Lock()
go func() {
mu.Lock() // 这不是持久阻塞
// ...
mu.Unlock()
}()
synctest.Wait() // 可能不会等待
5. 清理函数在气泡内运行
synctest.Test(t, func(t *testing.T) {
ch := make(chan int)
t.Cleanup(func() {
// 在气泡内运行
close(ch)
})
})
6. 终结器在气泡外运行
synctest.Test(t, func(t *testing.T) {
obj := &MyObject{}
runtime.SetFinalizer(obj, func(o *MyObject) {
// 在气泡外运行
})
})
快速参考
函数速查表
| 函数 | 作用 |
|---|---|
Test(t, f) | 在气泡中执行测试函数 |
Wait() | 等待所有 goroutine 持久阻塞 |
持久阻塞操作
| 操作 | 是否持久阻塞 |
|---|---|
| 气泡内通道的阻塞发送/接收 | ✅ 是 |
| 气泡内通道的阻塞 select | ✅ 是 |
sync.Cond.Wait | ✅ 是 |
sync.WaitGroup.Wait | ✅ 是(当 Add 在气泡内调用) |
time.Sleep | ✅ 是 |
sync.Mutex.Lock | ❌ 否 |
sync.RWMutex.Lock | ❌ 否 |
| 网络 I/O | ❌ 否 |
| 系统调用 | ❌ 否 |
虚拟时钟特性
- 初始时间:2000-01-01 00:00:00 UTC
- 时间前进:仅当所有 goroutine 持久阻塞时
- 时间停止:当根 goroutine 退出时
常见模式
// 基本并发测试
func TestConcurrent(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 并发代码
synctest.Wait()
})
}
// 超时测试
func TestTimeout(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), timeout)
defer cancel()
// 测试立即完成
})
}
// 通道测试
func TestChannel(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ch := make(chan T)
// 使用通道
synctest.Wait()
})
}
隔离规则
| 资源 | 气泡内创建 | 气泡外操作 |
|---|---|---|
| 通道 | ✅ 关联气泡 | ❌ panic |
| Timer | ✅ 关联气泡 | ❌ panic |
| Ticker | ✅ 关联气泡 | ❌ panic |
| WaitGroup | ✅ 关联气泡 | ❌ 致命错误 |
| Cond | ✅ 关联气泡 | ❌ 致命错误 |
总结
testing/synctest 包为并发代码测试提供了强大的支持:
核心功能:
Test:在隔离气泡中执行测试Wait:等待所有 goroutine 持久阻塞
主要优势:
- 隔离性:测试完全自包含,不与外部交互
- 虚拟时间:时间相关的测试可以立即完成
- 自动同步:自动等待 goroutine 完成
- 死锁检测:自动检测死锁并报告
适用场景:
- 并发算法测试
- 异步操作测试
- 超时和重试逻辑测试
- 通道通信测试
- Context 测试
- HTTP 客户端测试
使用建议:
- 使用
Test包裹所有并发测试 - 使用
Wait等待 goroutine 同步 - 避免网络 I/O 和系统调用
- 使用假的网络连接进行测试
- 注意持久阻塞和非持久阻塞的区别
典型用法:
func TestAsyncOperation(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
// 启动异步操作
go doWork()
// 等待完成
synctest.Wait()
// 验证结果
verifyResult()
})
}
通过 testing/synctest 包,可以使并发代码的测试变得简单、可靠和快速。