主题
05 - 实战项目:并发 Web 爬虫
项目目标
用前四节学到的所有知识,写一个并发 Web 爬虫:
┌─────────────────────────────────────────────────────────┐
│ 并发 Web 爬虫架构 │
│ │
│ ┌──────────┐ ┌────────────┐ ┌──────────────┐ │
│ │ URL 队列 │────▶│ Worker Pool│────▶│ 结果收集器 │ │
│ │ (channel) │ │ (N个并发) │ │ (channel) │ │
│ └──────────┘ └────────────┘ └──────────────┘ │
│ ▲ │ │ │
│ │ │ ▼ │
│ │ 发现新URL │ ┌──────────┐ │
│ └─────────────────┘ │ 输出结果 │ │
│ └──────────┘ │
│ │
│ 涉及知识点: │
│ ✓ goroutine + WaitGroup │
│ ✓ channel (带缓冲) │
│ ✓ sync.Mutex (去重) │
│ ✓ context (超时控制) │
│ ✓ 结构体 + 接口 + 错误处理 │
└─────────────────────────────────────────────────────────┘完整代码
第一步:定义数据结构
go
package main
import (
"context"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"sync"
"time"
)
// 爬取结果
type CrawlResult struct {
URL string
StatusCode int
Title string
Links []string
Error error
Duration time.Duration
}
// 爬虫主结构
type Crawler struct {
client *http.Client
maxWorkers int
maxDepth int
visited map[string]bool // 已访问的 URL
mu sync.Mutex // 保护 visited 的并发安全
}
func NewCrawler(maxWorkers, maxDepth int, timeout time.Duration) *Crawler {
return &Crawler{
client: &http.Client{
Timeout: timeout,
},
maxWorkers: maxWorkers,
maxDepth: maxDepth,
visited: make(map[string]bool),
}
}数据结构关系图:
┌──────────────────────────────────┐
│ Crawler │
│ │
│ client ────▶ http.Client │
│ └─ Timeout: 10s │
│ │
│ maxWorkers: 5 (并发数) │
│ maxDepth: 2 (最大深度) │
│ │
│ visited ────▶ map[string]bool │
│ mu ─────────▶ sync.Mutex │
│ ↑ 保护 visited │
└──────────────────────────────────┘
┌──────────────────────────────────┐
│ CrawlResult │
│ │
│ URL: "https://..." │
│ StatusCode: 200 │
│ Title: "Page Title" │
│ Links: ["url1","url2",...] │
│ Error: nil / error │
│ Duration: 150ms │
└──────────────────────────────────┘第二步:核心爬取逻辑
go
// 爬取单个页面
func (c *Crawler) fetchPage(ctx context.Context, url string) CrawlResult {
start := time.Now()
// 创建带 context 的请求(支持超时取消)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return CrawlResult{URL: url, Error: err, Duration: time.Since(start)}
}
req.Header.Set("User-Agent", "GoCrawler/1.0")
// 发起 HTTP 请求
resp, err := c.client.Do(req)
if err != nil {
return CrawlResult{URL: url, Error: err, Duration: time.Since(start)}
}
defer resp.Body.Close()
// 读取页面内容(限制最大读取量)
body, err := io.ReadAll(io.LimitReader(resp.Body, 1024*1024)) // 最多 1MB
if err != nil {
return CrawlResult{URL: url, StatusCode: resp.StatusCode, Error: err, Duration: time.Since(start)}
}
content := string(body)
return CrawlResult{
URL: url,
StatusCode: resp.StatusCode,
Title: extractTitle(content),
Links: extractLinks(content, url),
Duration: time.Since(start),
}
}
// 提取页面标题
func extractTitle(html string) string {
re := regexp.MustCompile(`<title[^>]*>(.*?)</title>`)
matches := re.FindStringSubmatch(html)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return "(无标题)"
}
// 提取页面中的链接
func extractLinks(html, baseURL string) []string {
re := regexp.MustCompile(`href=["'](https?://[^"']+)["']`)
matches := re.FindAllStringSubmatch(html, -1)
var links []string
seen := make(map[string]bool)
for _, match := range matches {
link := match[1]
if !seen[link] {
seen[link] = true
links = append(links, link)
}
}
return links
}单页爬取流程:
fetchPage("https://example.com")
│
▼
┌──────────────────┐
│ 创建 HTTP 请求 │
│ + Context 绑定 │
└────────┬─────────┘
│
▼
┌──────────────────┐ 超时或取消
│ 发送请求 │────────────────▶ 返回 Error
│ client.Do(req) │
└────────┬─────────┘
│ 成功
▼
┌──────────────────┐
│ 读取 Body │
│ (限制 1MB) │
└────────┬─────────┘
│
├──────────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ 提取 Title │ │ 提取 Links │
│ <title>...</ │ │ href="..." │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
┌──────────────────────────────────┐
│ CrawlResult │
│ URL + Status + Title + Links │
└──────────────────────────────────┘第三步:并发调度(核心)
go
// 检查 URL 是否已访问(并发安全)
func (c *Crawler) markVisited(url string) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.visited[url] {
return false // 已访问过
}
c.visited[url] = true
return true // 首次访问
}
// 启动并发爬取
func (c *Crawler) Crawl(ctx context.Context, seedURLs []string) []CrawlResult {
var allResults []CrawlResult
var resultsMu sync.Mutex
// 用于广度优先遍历的层级处理
currentLevel := seedURLs
for depth := 0; depth <= c.maxDepth && len(currentLevel) > 0; depth++ {
fmt.Printf("\n=== 第 %d 层 (%d 个 URL) ===\n", depth, len(currentLevel))
// 过滤已访问的 URL
var toVisit []string
for _, u := range currentLevel {
if c.markVisited(u) {
toVisit = append(toVisit, u)
}
}
if len(toVisit) == 0 {
break
}
// ====== Worker Pool 核心逻辑 ======
jobs := make(chan string, len(toVisit))
results := make(chan CrawlResult, len(toVisit))
// 启动 worker goroutines
var wg sync.WaitGroup
numWorkers := c.maxWorkers
if numWorkers > len(toVisit) {
numWorkers = len(toVisit)
}
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for url := range jobs {
select {
case <-ctx.Done():
results <- CrawlResult{URL: url, Error: ctx.Err()}
return
default:
result := c.fetchPage(ctx, url)
results <- result
fmt.Printf(" Worker %d: %s [%d] %s (%.0fms)\n",
workerID, result.URL, result.StatusCode,
result.Title, float64(result.Duration.Milliseconds()))
}
}
}(w)
}
// 分发任务
for _, url := range toVisit {
jobs <- url
}
close(jobs)
// 等待并收集结果
go func() {
wg.Wait()
close(results)
}()
var nextLevel []string
for result := range results {
resultsMu.Lock()
allResults = append(allResults, result)
resultsMu.Unlock()
// 收集下一层的 URL
if depth < c.maxDepth {
nextLevel = append(nextLevel, result.Links...)
}
}
currentLevel = nextLevel
}
return allResults
}并发调度的完整流程:
┌─────────────────────────────────────────────────────────┐
│ Crawl() 主循环 │
│ │
│ for depth = 0; depth <= maxDepth; depth++ { │
│ │
│ 第 0 层: [seed1, seed2, seed3] │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 过滤已访问 │ ← sync.Mutex 保护 │
│ └──────┬──────┘ │
│ ▼ │
│ ┌──────────────────────────────────────┐ │
│ │ Worker Pool │ │
│ │ │ │
│ │ jobs ─────▶ Worker 0 ─────┐ │ │
│ │ chan Worker 1 ─────┤──▶ results │ │
│ │ Worker 2 ─────┘ chan │ │
│ │ │ │
│ └──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ 收集 results + 提取 next level URLs │
│ │ │
│ ▼ │
│ 第 1 层: [link1, link2, ... linkN] │
│ │ │
│ ▼ │
│ (重复上述过程...) │
│ } │
└─────────────────────────────────────────────────────────┘第四步:主函数
go
func main() {
// 创建爬虫:5个并发 worker,最大深度1层,超时10秒
crawler := NewCrawler(5, 1, 10*time.Second)
// 整体超时 30 秒
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 种子 URL
seeds := []string{
"https://go.dev",
"https://pkg.go.dev",
"https://gobyexample.com",
}
fmt.Println("开始爬取...")
start := time.Now()
results := crawler.Crawl(ctx, seeds)
fmt.Printf("\n========== 爬取完成 ==========\n")
fmt.Printf("总耗时: %v\n", time.Since(start))
fmt.Printf("爬取页面: %d\n", len(results))
// 统计
var success, failed int
for _, r := range results {
if r.Error != nil {
failed++
} else {
success++
}
}
fmt.Printf("成功: %d, 失败: %d\n", success, failed)
// 打印结果摘要
fmt.Println("\n--- 结果摘要 ---")
for _, r := range results {
status := "✓"
if r.Error != nil {
status = "✗"
}
fmt.Printf("%s [%d] %s - %s (%v)\n",
status, r.StatusCode, r.Title, r.URL, r.Duration)
}
}运行方式
bash
# 初始化项目
mkdir crawler && cd crawler
go mod init crawler
# 把上面的代码保存到 main.go,然后运行
go run main.go
# 检测数据竞争
go run -race main.go知识点回顾
这个项目用到了模块一的所有核心知识:
┌──────────────────────────────────────────────────────┐
│ 知识点 │ 在项目中的应用 │
├──────────────────────────────────────────────────────┤
│ 结构体 (struct) │ Crawler, CrawlResult │
│ 方法 (method) │ c.fetchPage(), c.Crawl() │
│ 接口 (interface) │ io.Reader, error │
│ 指针 (*T) │ func (c *Crawler) 修改状态 │
│ 切片 ([]T) │ []string, []CrawlResult │
│ Map │ visited map[string]bool │
│ goroutine │ go func() { ... } │
│ channel │ jobs, results 通道 │
│ WaitGroup │ 等待所有 worker 完成 │
│ Mutex │ 保护 visited map 并发安全 │
│ context │ 超时取消控制 │
│ defer │ resp.Body.Close(), wg.Done() │
│ 错误处理 │ if err != nil { ... } │
│ 闭包 │ go func(workerID int) { }(w) │
│ range │ for url := range jobs { } │
│ 函数选项模式 │ NewCrawler 参数化构造 │
└──────────────────────────────────────────────────────┘进一步练习
可以扩展这个爬虫项目来练习更多 Go 知识:
┌──────────────────────────────────────────────────────┐
│ 练习 1: 加上速率限制 (rate limiting) │
│ → 用 time.Ticker 控制请求频率 │
│ │
│ 练习 2: 把结果写入文件 │
│ → 用 os.Create + json.Encoder │
│ │
│ 练习 3: 加上 robots.txt 支持 │
│ → 解析 robots.txt 尊重爬取规则 │
│ │
│ 练习 4: 加上命令行参数 │
│ → 用 flag 包解析 -workers, -depth 等 │
│ │
│ 练习 5: 改成 Pipeline 模式 │
│ → 抓取 → 解析 → 存储 三个阶段用 channel 连接 │
└──────────────────────────────────────────────────────┘模块一完成! 🎉
下一个模块: 模块二:Java 高并发编程