Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

testing/synctest 包详解

概述

testing/synctest 包提供了支持测试并发代码的功能。它通过在隔离的“气泡“(bubble)中运行测试函数,使得并发代码的测试变得简单可靠。

主要用途

  • 测试并发代码和异步操作
  • 隔离测试环境,避免与外部交互
  • 使用虚拟时钟控制时间相关的测试
  • 等待所有 goroutine 完成

核心概念

  • 气泡(Bubble):隔离的测试环境,包含一个主 goroutine 及其启动的所有 goroutine
  • 持久阻塞(Durably Blocked):goroutine 只能被同一气泡内的其他 goroutine 唤醒的阻塞状态
  • 虚拟时钟:气泡内的时间是虚拟的,只在所有 goroutine 持久阻塞时才前进

Go 版本要求:Go 1.24+(实验性功能)

包导入

import "testing/synctest"

函数详解(按 A-Z 分层归类)

T

Test

func Test(t *testing.T, f func(*testing.T))

作用:在新的气泡中执行测试函数 f

参数说明

  • t:测试上下文
  • f:在气泡中执行的测试函数

特点

  • 等待气泡中的所有 goroutine 退出后才返回
  • 如果气泡中的 goroutine 发生死锁,测试将失败
  • 不能在气泡内部调用(不能嵌套使用)
  • 提供的 *testing.T 具有以下特性:
    • T.Cleanup 函数在气泡内运行
    • T.Context 返回与气泡关联的上下文
    • 禁止调用 T.RunT.ParallelT.Deadline

示例

func TestAsync(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 在气泡中执行并发测试
        done := make(chan bool)
        go func() {
            // 执行一些异步操作
            done <- true
        }()
        
        <-done
        // 测试通过
    })
}

W

Wait

func Wait()

作用:阻塞直到当前气泡中除当前 goroutine 外的所有 goroutine 都处于持久阻塞状态

特点

  • 必须在气泡内调用
  • 不能由同一气泡中的多个 goroutine 并发调用
  • 当所有 goroutine 持久阻塞时返回
  • 如果存在死锁,Test 将 panic

持久阻塞的操作

  • 在气泡内创建的通道上进行阻塞发送或接收
  • 阻塞的 select 语句,其中每个案例都是气泡内创建的通道
  • sync.Cond.Wait
  • sync.WaitGroup.Wait(当 Add 在气泡内调用时)
  • time.Sleep

非持久阻塞的操作

  • 锁定 sync.Mutexsync.RWMutex
  • 阻塞在 I/O 上(如网络套接字读取)
  • 系统调用

示例

func TestWait(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        done := false
        go func() {
            done = true
        }()
        
        // Wait 将阻塞直到 goroutine 完成
        synctest.Wait()
        
        t.Log(done) // 总是输出 "true"
    })
}

类型详解

testing/synctest 包不导出任何类型,所有功能通过函数提供。

时间控制

虚拟时钟特性

在气泡内,time 包使用虚拟时钟:

  • 每个气泡有自己的时钟
  • 初始时间为 2000-01-01 UTC 午夜
  • 时间只在所有 goroutine 持久阻塞时才前进
  • 当气泡的根 goroutine 退出时,时间停止前进

时间示例

func TestTime(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        start := time.Now() // 总是 2000-01-01 00:00:00 UTC
        
        go func() {
            time.Sleep(1 * time.Second)
            t.Log(time.Since(start)) // 总是输出 "1s"
        }()
        
        time.Sleep(2 * time.Second) // 上面的 goroutine 会在 Sleep 返回前运行
        t.Log(time.Since(start))    // 总是输出 "2s"
    })
}

重要:这个测试会立即完成,而不是花费 2 秒!

隔离特性

通道隔离

在气泡内创建的通道、time.Timertime.Ticker 与气泡关联:

  • 从气泡外部操作气泡内的通道会导致 panic
  • 从气泡外部操作气泡内的定时器会导致 panic

WaitGroup 关联

sync.WaitGroup 在第一次调用 AddGo 时与气泡关联:

  • 一旦关联,从外部调用 AddGo 是致命错误
  • 包级变量定义的 WaitGroup(如 var wg sync.WaitGroup)无法与气泡关联
  • 存储在包级变量中的 WaitGroup 指针(如 var wg = new(sync.WaitGroup))可以关联

Cond 关联

sync.Cond.Wait 是持久阻塞操作:

  • 从气泡外部唤醒气泡内阻塞的 Cond.Wait 是致命错误

清理函数和终结器

  • 通过 T.Cleanup 注册的清理函数在气泡内运行
  • 通过 runtime.AddCleanupruntime.SetFinalizer 注册的函数在气泡外运行

典型示例

1. 基本异步测试

package synctest_test

import (
    "testing"
    "testing/synctest"
)

func TestBasicAsync(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        result := make(chan int)
        
        go func() {
            result <- 42
        }()
        
        value := <-result
        if value != 42 {
            t.Errorf("expected 42, got %d", value)
        }
    })
}

2. 测试 Context.AfterFunc

package synctest_test

import (
    "context"
    "testing"
    "testing/synctest"
)

func TestContextAfterFunc(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 创建可取消的上下文
        ctx, cancel := context.WithCancel(t.Context())
        
        // 注册取消时执行的函数
        afterFuncCalled := false
        context.AfterFunc(ctx, func() {
            afterFuncCalled = true
        })
        
        // 上下文尚未取消,AfterFunc 不会被调用
        synctest.Wait()
        if afterFuncCalled {
            t.Fatal("before context is canceled: AfterFunc called")
        }
        
        // 取消上下文并等待 AfterFunc 执行
        cancel()
        synctest.Wait()
        if !afterFuncCalled {
            t.Fatal("after context is canceled: AfterFunc not called")
        }
    })
}

3. 测试 Context.WithTimeout

package synctest_test

import (
    "context"
    "testing"
    "testing/synctest"
    "time"
)

func TestContextWithTimeout(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        const timeout = 5 * time.Second
        ctx, cancel := context.WithTimeout(t.Context(), timeout)
        defer cancel()
        
        // 等待略少于超时时间
        time.Sleep(timeout - time.Nanosecond)
        synctest.Wait()
        
        if err := ctx.Err(); err != nil {
            t.Fatalf("before timeout: ctx.Err() = %v, want nil", err)
        }
        
        // 等待剩余时间直到超时
        time.Sleep(time.Nanosecond)
        synctest.Wait()
        
        if err := ctx.Err(); err != context.DeadlineExceeded {
            t.Fatalf("after timeout: ctx.Err() = %v, want DeadlineExceeded", err)
        }
    })
}

4. 测试多个 Goroutine 同步

package synctest_test

import (
    "sync"
    "testing"
    "testing/synctest"
)

func TestMultipleGoroutines(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        var wg sync.WaitGroup
        results := make([]int, 3)
        
        for i := 0; i < 3; i++ {
            wg.Add(1)
            go func(idx int) {
                defer wg.Done()
                results[idx] = idx * 2
            }(i)
        }
        
        wg.Wait()
        synctest.Wait()
        
        expected := []int{0, 2, 4}
        for i, v := range results {
            if v != expected[i] {
                t.Errorf("results[%d] = %d, want %d", i, v, expected[i])
            }
        }
    })
}

5. 测试通道通信

package synctest_test

import (
    "testing"
    "testing/synctest"
)

func TestChannelCommunication(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ch := make(chan int)
        done := make(chan bool)
        
        // 生产者
        go func() {
            for i := 0; i < 5; i++ {
                ch <- i
            }
            close(ch)
        }()
        
        // 消费者
        go func() {
            sum := 0
            for v := range ch {
                sum += v
            }
            t.Log("sum =", sum)
            done <- true
        }()
        
        synctest.Wait()
        <-done
    })
}

6. 测试 Select 语句

package synctest_test

import (
    "testing"
    "testing/synctest"
)

func TestSelectStatement(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ch1 := make(chan int)
        ch2 := make(chan string)
        result := make(chan string)
        
        go func() {
            select {
            case v := <-ch1:
                result <- "got int: " + string(rune(v))
            case v := <-ch2:
                result <- "got string: " + v
            }
        }()
        
        ch2 <- "hello"
        synctest.Wait()
        
        msg := <-result
        t.Log(msg)
    })
}

7. 测试 Time.Sleep 和定时器

package synctest_test

import (
    "testing"
    "testing/synctest"
    "time"
)

func TestSleepAndTicker(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        start := time.Now()
        
        // 多次 Sleep 会快速执行
        time.Sleep(1 * time.Second)
        time.Sleep(2 * time.Second)
        time.Sleep(3 * time.Second)
        
        elapsed := time.Since(start)
        if elapsed != 6*time.Second {
            t.Errorf("elapsed = %v, want 6s", elapsed)
        }
        
        // 测试定时器
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()
        
        count := 0
        done := make(chan bool)
        
        go func() {
            for range ticker.C {
                count++
                if count >= 3 {
                    done <- true
                    return
                }
            }
        }()
        
        synctest.Wait()
        <-done
        
        t.Log("ticker fired", count, "times")
    })
}

8. 测试 Cond 同步

package synctest_test

import (
    "sync"
    "testing"
    "testing/synctest"
)

func TestCondSync(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        var mu sync.Mutex
        cond := sync.NewCond(&mu)
        ready := false
        
        // 等待者
        go func() {
            mu.Lock()
            for !ready {
                cond.Wait()
            }
            t.Log("condition is ready")
            mu.Unlock()
        }()
        
        // 给予等待者时间进入 Wait
        synctest.Wait()
        
        // 通知者
        mu.Lock()
        ready = true
        cond.Broadcast()
        mu.Unlock()
        
        synctest.Wait()
    })
}

9. 测试 HTTP 100 Continue(高级示例)

package synctest_test

import (
    "bufio"
    "bytes"
    "io"
    "net"
    "net/http"
    "strings"
    "testing"
    "testing/synctest"
    "time"
)

func TestHTTPTransport100Continue(t *testing.T) {
    synctest.Test(t, func(*testing.T) {
        // 创建进程内假网络连接
        srvConn, cliConn := net.Pipe()
        defer cliConn.Close()
        defer srvConn.Close()
        
        tr := &http.Transport{
            DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
                return cliConn, nil
            },
            ExpectContinueTimeout: 5 * time.Second,
        }
        
        body := "request body"
        go func() {
            req, _ := http.NewRequest("PUT", "http://test.tld/", strings.NewReader(body))
            req.Header.Set("Expect", "100-continue")
            resp, err := tr.RoundTrip(req)
            if err != nil {
                t.Errorf("RoundTrip: unexpected error %v", err)
            } else {
                resp.Body.Close()
            }
        }()
        
        // 读取请求头
        req, err := http.ReadRequest(bufio.NewReader(srvConn))
        if err != nil {
            t.Fatalf("ReadRequest: %v", err)
        }
        
        // 复制请求体
        var gotBody bytes.Buffer
        go io.Copy(&gotBody, req.Body)
        synctest.Wait()
        
        if got, want := gotBody.String(), ""; got != want {
            t.Fatalf("before 100 Continue, read body: %q, want %q", got, want)
        }
        
        // 发送 100 Continue 响应
        srvConn.Write([]byte("HTTP/1.1 100 Continue\r\n\r\n"))
        synctest.Wait()
        
        if got, want := gotBody.String(), body; got != want {
            t.Fatalf("after 100 Continue, read body: %q, want %q", got, want)
        }
        
        // 发送最终响应
        srvConn.Write([]byte("HTTP/1.1 200 OK\r\n\r\n"))
    })
}

10. 测试带 Cleanup 的异步操作

package synctest_test

import (
    "testing"
    "testing/synctest"
)

func TestWithCleanup(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ch := make(chan int)
        done := make(chan bool)
        
        // 启动后台 goroutine
        go func() {
            for {
                select {
                case v := <-ch:
                    t.Log("received:", v)
                case <-done:
                    t.Log("cleanup done")
                    return
                }
            }
        }()
        
        ch <- 1
        ch <- 2
        ch <- 3
        
        t.Cleanup(func() {
            close(done)
        })
    })
}

最佳实践

1. 使用 Test 包裹所有并发测试

func TestConcurrent(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 所有并发代码都在气泡中运行
    })
}

2. 使用 Wait 等待 goroutine 完成

func TestWithWait(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        go func() {
            // 执行一些工作
        }()
        
        synctest.Wait() // 等待所有 goroutine 阻塞
    })
}

3. 利用虚拟时钟加速时间相关测试

func TestTimeout(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
        defer cancel()
        
        // 测试会立即完成,不需要等待 5 秒
        time.Sleep(5 * time.Second)
        
        if ctx.Err() != context.DeadlineExceeded {
            t.Error("expected timeout")
        }
    })
}

4. 避免与外部交互

// 不好的做法
func TestWithNetwork(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 避免真实的网络调用
        resp, err := http.Get("http://example.com")
    })
}

// 好的做法
func TestWithFakeNetwork(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 使用假的网络连接
        srvConn, cliConn := net.Pipe()
    })
}

5. 使用包级变量的 WaitGroup 指针

// 不好的做法
var wg sync.WaitGroup // 无法与气泡关联

// 好的做法
var wg = new(sync.WaitGroup) // 可以与气泡关联

6. 避免嵌套 Test 调用

// 错误:不能在气泡内调用 Test
synctest.Test(t, func(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 这会导致错误
    })
})

与其他包配合

testing 包

import (
    "testing"
    "testing/synctest"
)

func TestExample(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 测试代码
    })
}

context 包

import (
    "context"
    "testing/synctest"
)

func TestContext(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ctx := t.Context() // 返回与气泡关联的上下文
        ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    })
}

sync 包

import (
    "sync"
    "testing/synctest"
)

func TestSync(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
            defer wg.Done()
            // 工作
        }()
        wg.Wait()
        synctest.Wait()
    })
}

time 包

import (
    "testing/synctest"
    "time"
)

func TestTime(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        start := time.Now()
        time.Sleep(1 * time.Hour) // 虚拟时间,立即完成
        elapsed := time.Since(start)
        t.Log("elapsed:", elapsed)
    })
}

注意事项

1. 禁止嵌套使用

// 错误示例
synctest.Test(t, func(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // panic: Test must not be called from within a bubble
    })
})

2. Wait 的并发调用限制

// 错误示例
synctest.Test(t, func(t *testing.T) {
    go func() {
        synctest.Wait() // 错误:并发调用
    }()
    synctest.Wait() // 错误:并发调用
})

3. 避免网络 I/O

网络 I/O 不是持久阻塞操作,会阻止气泡进入空闲状态:

// 避免这样做
synctest.Test(t, func(t *testing.T) {
    conn, _ := net.Dial("tcp", "example.com:80")
    conn.Read(buf) // 不会持久阻塞
})

4. Mutex 不是持久阻塞

// Mutex 锁定不是持久阻塞
var mu sync.Mutex
mu.Lock()
go func() {
    mu.Lock() // 这不是持久阻塞
    // ...
    mu.Unlock()
}()
synctest.Wait() // 可能不会等待

5. 清理函数在气泡内运行

synctest.Test(t, func(t *testing.T) {
    ch := make(chan int)
    
    t.Cleanup(func() {
        // 在气泡内运行
        close(ch)
    })
})

6. 终结器在气泡外运行

synctest.Test(t, func(t *testing.T) {
    obj := &MyObject{}
    
    runtime.SetFinalizer(obj, func(o *MyObject) {
        // 在气泡外运行
    })
})

快速参考

函数速查表

函数作用
Test(t, f)在气泡中执行测试函数
Wait()等待所有 goroutine 持久阻塞

持久阻塞操作

操作是否持久阻塞
气泡内通道的阻塞发送/接收✅ 是
气泡内通道的阻塞 select✅ 是
sync.Cond.Wait✅ 是
sync.WaitGroup.Wait✅ 是(当 Add 在气泡内调用)
time.Sleep✅ 是
sync.Mutex.Lock❌ 否
sync.RWMutex.Lock❌ 否
网络 I/O❌ 否
系统调用❌ 否

虚拟时钟特性

  • 初始时间:2000-01-01 00:00:00 UTC
  • 时间前进:仅当所有 goroutine 持久阻塞时
  • 时间停止:当根 goroutine 退出时

常见模式

// 基本并发测试
func TestConcurrent(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 并发代码
        synctest.Wait()
    })
}

// 超时测试
func TestTimeout(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ctx, cancel := context.WithTimeout(t.Context(), timeout)
        defer cancel()
        // 测试立即完成
    })
}

// 通道测试
func TestChannel(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        ch := make(chan T)
        // 使用通道
        synctest.Wait()
    })
}

隔离规则

资源气泡内创建气泡外操作
通道✅ 关联气泡❌ panic
Timer✅ 关联气泡❌ panic
Ticker✅ 关联气泡❌ panic
WaitGroup✅ 关联气泡❌ 致命错误
Cond✅ 关联气泡❌ 致命错误

总结

testing/synctest 包为并发代码测试提供了强大的支持:

核心功能

  • Test:在隔离气泡中执行测试
  • Wait:等待所有 goroutine 持久阻塞

主要优势

  1. 隔离性:测试完全自包含,不与外部交互
  2. 虚拟时间:时间相关的测试可以立即完成
  3. 自动同步:自动等待 goroutine 完成
  4. 死锁检测:自动检测死锁并报告

适用场景

  • 并发算法测试
  • 异步操作测试
  • 超时和重试逻辑测试
  • 通道通信测试
  • Context 测试
  • HTTP 客户端测试

使用建议

  1. 使用 Test 包裹所有并发测试
  2. 使用 Wait 等待 goroutine 同步
  3. 避免网络 I/O 和系统调用
  4. 使用假的网络连接进行测试
  5. 注意持久阻塞和非持久阻塞的区别

典型用法

func TestAsyncOperation(t *testing.T) {
    synctest.Test(t, func(t *testing.T) {
        // 启动异步操作
        go doWork()
        
        // 等待完成
        synctest.Wait()
        
        // 验证结果
        verifyResult()
    })
}

通过 testing/synctest 包,可以使并发代码的测试变得简单、可靠和快速。