> 技术文档 > Go进阶高并发(多线程)处理教程

Go进阶高并发(多线程)处理教程


Go进阶高并发处理教程

目录

  1. Go并发编程基础
  2. Goroutine深入理解
  3. 同步原语详解
  4. 并发模式与最佳实践
  5. 性能优化技巧
  6. 实战案例

Go并发编程基础

什么是并发?

并发是指程序能够同时处理多个任务的能力。Go语言从设计之初就将并发作为核心特性,提供了简洁而强大的并发编程模型

Go并发模型的优势

  • 轻量级协程:Goroutine比传统线程更轻量
  • CSP模型:通过通信来共享内存,而不是通过共享内存来通信
  • 内置调度器:Go运行时自动管理goroutine的调度

Goroutine深入理解

创建和启动Goroutine

package mainimport ( \"fmt\" \"time\")func worker(id int) { fmt.Printf(\"Worker %d starting\\n\", id) time.Sleep(time.Second) fmt.Printf(\"Worker %d done\\n\", id)}func main() { // 启动多个goroutine for i := 1; i <= 5; i++ { go worker(i) } // 等待所有goroutine完成 time.Sleep(2 * time.Second) fmt.Println(\"All workers completed\")}

Goroutine的生命周期

  1. 创建:使用go关键字创建
  2. 调度:由Go调度器管理
  3. 执行:在可用的OS线程上执行
  4. 结束:函数返回时自动结束

调度器工作原理

Go使用M:N调度模型:

  • M:OS线程(Machine)
  • P:处理器(Processor)
  • G:Goroutine
G1 G2 G3 G4 \\ | | / \\ | | / \\ | |/ \\| | P1 P2 | | M1 M2

同步原语详解

sync.WaitGroup

用于等待一组goroutine完成:

package mainimport ( \"fmt\" \"sync\" \"time\")func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // 完成时调用Done() fmt.Printf(\"Worker %d starting\\n\", id) time.Sleep(time.Second) fmt.Printf(\"Worker %d done\\n\", id)}func main() { var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) // 增加等待计数 go worker(i, &wg) } wg.Wait() // 等待所有goroutine完成 fmt.Println(\"All workers completed\")}

sync.Mutex

互斥锁用于保护共享资源:

package mainimport ( \"fmt\" \"sync\")type Counter struct { mu sync.Mutex value int}func (c *Counter) Increment() { c.mu.Lock() defer c.mu.Unlock() c.value++}func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.value}func main() { counter := &Counter{} var wg sync.WaitGroup // 启动100个goroutine同时增加计数器 for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 1000; j++ { counter.Increment() } }() } wg.Wait() fmt.Printf(\"Final counter value: %d\\n\", counter.Value())}

sync.RWMutex

读写锁允许多个读操作同时进行:

type SafeMap struct { mu sync.RWMutex data map[string]int}func (sm *SafeMap) Get(key string) (int, bool) { sm.mu.RLock() defer sm.mu.RUnlock() val, ok := sm.data[key] return val, ok}func (sm *SafeMap) Set(key string, value int) { sm.mu.Lock() defer sm.mu.Unlock() sm.data[key] = value}

sync.Once

确保某个操作只执行一次:

package mainimport ( \"fmt\" \"sync\")var once sync.Oncevar instance *Singletontype Singleton struct { data string}func GetInstance() *Singleton { once.Do(func() { fmt.Println(\"Creating singleton instance\") instance = &Singleton{data: \"singleton\"} }) return instance}func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() s := GetInstance() fmt.Printf(\"Goroutine %d got instance: %s\\n\", id, s.data) }(i) } wg.Wait()}

并发模式与最佳实践

Worker Pool模式

package mainimport ( \"fmt\" \"sync\" \"time\")type Job struct { ID int Data string}type Result struct { Job Job Output string}func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { fmt.Printf(\"Worker %d processing job %d\\n\", id, job.ID) time.Sleep(time.Millisecond * 100) // 模拟工作 result := Result{ Job: job, Output: fmt.Sprintf(\"Processed by worker %d\", id), } results <- result }}func main() { const numWorkers = 3 const numJobs = 10 jobs := make(chan Job, numJobs) results := make(chan Result, numJobs) var wg sync.WaitGroup // 启动worker for i := 1; i <= numWorkers; i++ { wg.Add(1) go worker(i, jobs, results, &wg) } // 发送任务 for i := 1; i <= numJobs; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf(\"data-%d\", i)} } close(jobs) // 等待所有worker完成 go func() { wg.Wait() close(results) }() // 收集结果 for result := range results { fmt.Printf(\"Job %d result: %s\\n\", result.Job.ID, result.Output) }}

扇入扇出模式

// 扇出:将工作分发给多个goroutinefunc fanOut(input <-chan int, workers int) []<-chan int { outputs := make([]<-chan int, workers) for i := 0; i < workers; i++ { output := make(chan int) outputs[i] = output go func(out chan<- int) { defer close(out) for n := range input { out <- n * n // 计算平方 } }(output) } return outputs}// 扇入:将多个channel的结果合并func fanIn(inputs ...<-chan int) <-chan int { output := make(chan int) var wg sync.WaitGroup for _, input := range inputs { wg.Add(1) go func(in <-chan int) { defer wg.Done() for n := range in { output <- n } }(input) } go func() { wg.Wait() close(output) }() return output}

性能优化技巧

1. 合理设置GOMAXPROCS

import \"runtime\"func init() { // 设置使用的CPU核心数 runtime.GOMAXPROCS(runtime.NumCPU())}

2. 避免goroutine泄漏

// 错误示例:可能导致goroutine泄漏func badExample() { ch := make(chan int) go func() { ch <- 1 // 如果没有接收者,这个goroutine会永远阻塞 }() // 函数返回,但goroutine仍在运行}// 正确示例:使用context控制goroutine生命周期func goodExample(ctx context.Context) { ch := make(chan int, 1) // 使用缓冲channel go func() { select { case ch <- 1: case <-ctx.Done(): return } }()}

3. 使用对象池减少GC压力

import \"sync\"var pool = sync.Pool{ New: func() interface{} { return make([]byte, 1024) },}func processData(data []byte) { buf := pool.Get().([]byte) defer pool.Put(buf) // 使用buf处理数据}

实战案例

并发HTTP客户端

package mainimport ( \"fmt\" \"net/http\" \"sync\" \"time\")type Result struct { URL string StatusCode int Duration time.Duration Error error}func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) { defer wg.Done() start := time.Now() resp, err := http.Get(url) duration := time.Since(start) result := Result{ URL: url, Duration: duration, Error: err, } if err == nil { result.StatusCode = resp.StatusCode resp.Body.Close() } results <- result}func main() { urls := []string{ \"https://www.google.com\", \"https://www.github.com\", \"https://www.stackoverflow.com\", \"https://www.golang.org\", } results := make(chan Result, len(urls)) var wg sync.WaitGroup // 并发请求所有URL for _, url := range urls { wg.Add(1) go fetchURL(url, results, &wg) } // 等待所有请求完成 go func() { wg.Wait() close(results) }() // 处理结果 for result := range results { if result.Error != nil { fmt.Printf(\"Error fetching %s: %v\\n\", result.URL, result.Error) } else { fmt.Printf(\"%s: %d (%v)\\n\", result.URL, result.StatusCode, result.Duration) } }}

总结

Go语言的并发编程提供了强大而简洁的工具:

  1. Goroutine:轻量级协程,易于创建和管理
  2. Channel:类型安全的通信机制
  3. sync包:提供各种同步原语
  4. 并发模式:Worker Pool、扇入扇出等经典模式

掌握这些概念和技巧,能够帮助您构建高性能、可扩展的并发应用程序。记住Go的并发哲学:通过通信来共享内存,而不是通过共享内存来通信

参考资源

  • Go官方文档 - 并发
  • Go并发模式
  • Go内存模型