> 技术文档 > Python实例题:基于 Python 的简单爬虫与数据可视化

Python实例题:基于 Python 的简单爬虫与数据可视化


目录

Python实例题

题目

要求:

解题思路:

代码实现:

Python实例题

题目

基于 Python 的简单爬虫与数据可视化

要求

  • 使用 Python 构建一个简单的爬虫程序,支持以下功能:
    • 从指定网站抓取数据(如新闻、商品信息等)
    • 解析 HTML 内容并提取所需信息
    • 将数据存储到文件或数据库
    • 对抓取的数据进行统计和可视化分析
  • 使用 requests 和 BeautifulSoup 进行网页爬取和解析。
  • 使用 pandas 和 matplotlib 进行数据处理和可视化。
  • 添加命令行界面,支持用户输入爬取参数。

解题思路

  • 使用 requests 发送 HTTP 请求获取网页内容。
  • 通过 BeautifulSoup 解析 HTML 内容并提取数据。
  • 使用 pandas 处理和分析数据。
  • 利用 matplotlib 和 seaborn 绘制图表
  • 实现基本的反爬策略(如随机 User-Agent、请求间隔)。

代码实现

import requestsfrom bs4 import BeautifulSoupimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsimport timeimport randomimport argparseimport jsonimport osfrom datetime import datetimeimport re# 设置中文字体plt.rcParams[\"font.family\"] = [\"SimHei\", \"WenQuanYi Micro Hei\", \"Heiti TC\"]plt.rcParams[\'axes.unicode_minus\'] = False # 解决负号显示问题class WebScraper: def __init__(self, base_url, headers=None, delay_min=1, delay_max=3): self.base_url = base_url self.headers = headers or { \'User-Agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36\' } self.delay_min = delay_min self.delay_max = delay_max self.data = [] def fetch_page(self, url): \"\"\"发送HTTP请求获取页面内容\"\"\" try: # 添加随机延迟,避免频繁请求 delay = random.uniform(self.delay_min, self.delay_max) time.sleep(delay) response = requests.get(url, headers=self.headers) response.raise_for_status() # 检查请求是否成功 return response.text except requests.exceptions.RequestException as e: print(f\"请求错误: {e}\") return None def parse_page(self, html_content, parser=\'html.parser\'): \"\"\"解析HTML内容\"\"\" if not html_content: return None return BeautifulSoup(html_content, parser) def extract_data(self, soup): \"\"\"从页面中提取数据(需要在子类中实现)\"\"\" raise NotImplementedError(\"子类必须实现extract_data方法\") def scrape(self, pages=1, url_pattern=None): \"\"\"执行爬取任务\"\"\" self.data = [] for page in range(1, pages + 1): print(f\"正在爬取第 {page}/{pages} 页...\") if url_pattern: url = url_pattern.format(page=page) else: url = self.base_url if page == 1 else f\"{self.base_url}?page={page}\" html_content = self.fetch_page(url) if not html_content: continue soup = self.parse_page(html_content) if not soup: continue page_data = self.extract_data(soup) if page_data: self.data.extend(page_data) print(f\"第 {page} 页爬取完成,获取 {len(page_data)} 条数据\") print(f\"爬取完成,共获取 {len(self.data)} 条数据\") return self.data def save_to_csv(self, filename=None): \"\"\"将数据保存为CSV文件\"\"\" if not self.data: print(\"没有数据可保存\") return filename = filename or f\"data_{datetime.now().strftime(\'%Y%m%d_%H%M%S\')}.csv\" df = pd.DataFrame(self.data) df.to_csv(filename, index=False, encoding=\'utf-8-sig\') print(f\"数据已保存到 {filename}\") def save_to_json(self, filename=None): \"\"\"将数据保存为JSON文件\"\"\" if not self.data: print(\"没有数据可保存\") return filename = filename or f\"data_{datetime.now().strftime(\'%Y%m%d_%H%M%S\')}.json\" with open(filename, \'w\', encoding=\'utf-8\') as f: json.dump(self.data, f, ensure_ascii=False, indent=2) print(f\"数据已保存到 {filename}\")class NewsScraper(WebScraper): def __init__(self, base_url, headers=None): super().__init__(base_url, headers) def extract_data(self, soup): \"\"\"从新闻页面提取数据\"\"\" articles = [] # 查找所有新闻条目 news_items = soup.select(\'div.news-item\') # 根据实际页面结构调整选择器 for item in news_items: try: title_elem = item.select_one(\'h2.title a\') title = title_elem.text.strip() if title_elem else \"未找到标题\" link = title_elem[\'href\'] if title_elem and \'href\' in title_elem.attrs else \"#\" if link.startswith(\'/\'):  link = self.base_url + link date_elem = item.select_one(\'span.date\') date = date_elem.text.strip() if date_elem else \"未知日期\" summary_elem = item.select_one(\'p.summary\') summary = summary_elem.text.strip() if summary_elem else \"无摘要\" articles.append({  \'title\': title,  \'link\': link,  \'date\': date,  \'summary\': summary }) except Exception as e: print(f\"提取新闻条目时出错: {e}\") continue return articles def analyze(self): \"\"\"分析新闻数据\"\"\" if not self.data: print(\"没有数据可分析\") return df = pd.DataFrame(self.data) # 分析发布日期 try: # 尝试解析日期 df[\'date\'] = pd.to_datetime(df[\'date\']) df[\'day_of_week\'] = df[\'date\'].dt.day_name() # 按星期几统计新闻数量 day_counts = df[\'day_of_week\'].value_counts() plt.figure(figsize=(10, 6)) sns.barplot(x=day_counts.index, y=day_counts.values) plt.title(\'新闻发布的星期分布\') plt.xlabel(\'星期\') plt.ylabel(\'新闻数量\') plt.xticks(rotation=45) plt.tight_layout() plt.savefig(\'news_by_day.png\') plt.close() print(\"新闻发布星期分布分析完成,图表已保存为 news_by_day.png\") except Exception as e: print(f\"日期分析出错: {e}\") # 分析标题长度 df[\'title_length\'] = df[\'title\'].str.len() plt.figure(figsize=(10, 6)) sns.histplot(df[\'title_length\'], bins=20, kde=True) plt.title(\'新闻标题长度分布\') plt.xlabel(\'标题长度(字符)\') plt.ylabel(\'频率\') plt.tight_layout() plt.savefig(\'title_length_distribution.png\') plt.close() print(\"新闻标题长度分析完成,图表已保存为 title_length_distribution.png\")class ProductScraper(WebScraper): def __init__(self, base_url, headers=None): super().__init__(base_url, headers) def extract_data(self, soup): \"\"\"从商品页面提取数据\"\"\" products = [] # 查找所有商品条目 product_items = soup.select(\'div.product-item\') # 根据实际页面结构调整选择器 for item in product_items: try: name_elem = item.select_one(\'h3.product-name\') name = name_elem.text.strip() if name_elem else \"未找到商品名称\" price_elem = item.select_one(\'span.price\') price_text = price_elem.text.strip() if price_elem else \"0\" # 提取价格中的数字部分 price = re.search(r\'\\d+\\.?\\d*\', price_text) price = float(price.group(0)) if price else 0 rating_elem = item.select_one(\'div.rating\') rating = rating_elem[\'data-rating\'] if rating_elem and \'data-rating\' in rating_elem.attrs else \"0\" rating = float(rating) review_count_elem = item.select_one(\'span.review-count\') review_count_text = review_count_elem.text.strip() if review_count_elem else \"0\" # 提取评论数中的数字部分 review_count = re.search(r\'\\d+\', review_count_text) review_count = int(review_count.group(0)) if review_count else 0 products.append({  \'name\': name,  \'price\': price,  \'rating\': rating,  \'review_count\': review_count }) except Exception as e: print(f\"提取商品条目时出错: {e}\") continue return products def analyze(self): \"\"\"分析商品数据\"\"\" if not self.data: print(\"没有数据可分析\") return df = pd.DataFrame(self.data) # 价格分布 plt.figure(figsize=(10, 6)) sns.histplot(df[\'price\'], bins=20, kde=True) plt.title(\'商品价格分布\') plt.xlabel(\'价格\') plt.ylabel(\'频率\') plt.tight_layout() plt.savefig(\'price_distribution.png\') plt.close() print(\"商品价格分布分析完成,图表已保存为 price_distribution.png\") # 价格与评分关系 plt.figure(figsize=(10, 6)) sns.scatterplot(x=\'price\', y=\'rating\', data=df, alpha=0.6) plt.title(\'商品价格与评分关系\') plt.xlabel(\'价格\') plt.ylabel(\'评分\') plt.tight_layout() plt.savefig(\'price_rating_relationship.png\') plt.close() print(\"商品价格与评分关系分析完成,图表已保存为 price_rating_relationship.png\") # 相关性分析 corr_matrix = df[[\'price\', \'rating\', \'review_count\']].corr() plt.figure(figsize=(10, 8)) sns.heatmap(corr_matrix, annot=True, cmap=\'coolwarm\', fmt=\'.2f\', linewidths=0.5) plt.title(\'商品属性相关性分析\') plt.tight_layout() plt.savefig(\'correlation_analysis.png\') plt.close() print(\"商品属性相关性分析完成,图表已保存为 correlation_analysis.png\")def main(): parser = argparse.ArgumentParser(description=\'简单网页爬虫与数据分析工具\') parser.add_argument(\'--type\', type=str, choices=[\'news\', \'product\'], default=\'news\', help=\'爬取类型:news(新闻)或product(商品)\') parser.add_argument(\'--url\', type=str, required=True, help=\'目标网站URL\') parser.add_argument(\'--pages\', type=int, default=1, help=\'要爬取的页数\') parser.add_argument(\'--output\', type=str, choices=[\'csv\', \'json\'], default=\'csv\', help=\'输出格式:csv或json\') parser.add_argument(\'--delay_min\', type=float, default=1, help=\'最小请求延迟(秒)\') parser.add_argument(\'--delay_max\', type=float, default=3, help=\'最大请求延迟(秒)\') parser.add_argument(\'--analyze\', action=\'store_true\', help=\'是否进行数据分析\') args = parser.parse_args() # 创建爬虫实例 if args.type == \'news\': scraper = NewsScraper(args.url, delay_min=args.delay_min, delay_max=args.delay_max) else: scraper = ProductScraper(args.url, delay_min=args.delay_min, delay_max=args.delay_max) # 执行爬取 scraper.scrape(pages=args.pages) # 保存数据 if args.output == \'csv\': scraper.save_to_csv() else: scraper.save_to_json() # 数据分析 if args.analyze: scraper.analyze()if __name__ == \"__main__\": main()