> 技术文档 > 使用AKShare获取历史所有股票的筹码分布

使用AKShare获取历史所有股票的筹码分布


项目地址

AKShare 项目概览 — AKShare 1.17.26 文档
https://github.com/akfamily/akshare

代码

# -*- coding: utf-8 -*-\"\"\"抓取全部 A 股最新筹码分布(CYQ)并保存为 CSV依赖: pip install akshare pandas tqdm\"\"\"import akshare as akimport pandas as pdimport randomimport timefrom datetime import datetimefrom tqdm import tqdmUA = (\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) \" \"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36\")def fetch_cyq_safe(code, retry=3): \"\"\"获取单只股票筹码分布,带重试\"\"\" for i in range(retry): try: df = ak.stock_cyq_em(symbol=code) if not df.empty: # 添加股票代码列便于合并 df.insert(0, \"code\", code) return df except Exception as e: if i < retry - 1: time.sleep(random.uniform(1.5, 3)) else: return None return Nonedef main(): # 1. 获取 A 股代码列表 spot = ak.stock_zh_a_spot_em() codes = spot[\"代码\"].tolist() print(f\"共 {len(codes)} 只股票,开始抓取筹码分布...\") success, fail = 0, [] for code in tqdm(codes, desc=\"CYQ\"): df = fetch_cyq_safe(code) if df is not None: df.to_csv(f\"cyq_{code}.csv\", index=False, encoding=\"utf-8-sig\") success += 1 else: fail.append(code) # 批次间隔 time.sleep(random.uniform(1.5, 3)) # 2. 记录失败代码 if fail: pd.Series(fail, name=\"fail_code\").to_csv(\"fail_codes.txt\", index=False, header=False) print(f\"\\n成功 {success} 只,失败 {len(fail)} 只,已写入 fail_codes.txt\") else: print(\"\\n全部抓取完成!\")if __name__ == \"__main__\": main()