> 文档中心 > python 70行完成requests抓取csdn阅读量.

python 70行完成requests抓取csdn阅读量.


第三方库 requests, fake_useragent

import random, requests, re, logging, timefrom fake_useragent import UserAgent

日志

logging.basicConfig(level=logging.INFO , format='%(asctime)s[%(levelname)s]: %(message)s')logger = logging.getLogger(__name__)

随机获取请求头User-Agent

def getHeader():    return {'Referer': 'https://blog.csdn.net','User-Agent': UserAgent().random}

解析读取量

def parse_html(text:str):    return int(re.search(r'([0-9]*)', text).group(1))

请求网页

def requestUrlText(url):    try: header = getHeader() response = requests.get(url, headers=header) if response.status_code == 200:     return parse_html(response.text)    except (requests.RequestException, AttributeError): return False

封装为请求url类

class request_url():    def __init__(self, url:str): self.url = url self.read = 0    def succeeded(self): #弃用 logging.debug(f"Request Url{self.url} successfully.")    def _update(self): _res = requestUrlText(self.url) if isinstance(_res, int):     self.succeeded()     _var = _res - self.read     self.read = _res     return _var return 0    def update(self): while True:     r = self._update()     if r: return r    def readnum(self) -> int: return self.read

封装总调用类

class request_urls():    def __init__(self, *args): self.__urls = [] self.add(*args) self.all_add = 0    def add(self, *urls): for url in urls:     self.__urls.append(request_url(url))    def get(self) -> int: return sum([u.update() for u in self.__urls])    def total(self) -> int: return sum([u.readnum() for u in self.__urls])    def update(self) -> None: add = self.get() self.all_add += add logger.info(f"total {self.total()}({len(self.__urls)}blogs), ↑{add} [ALL {self.all_add}]")

运行与主程序

def run(urls):    req = request_urls(*urls)    req.get() #初始化.    while True: req.update() time.sleep(random.randint(60, 70))if __name__ == '__main__':    run(["https://blog.csdn.net/m0_60394896/article/details/124571653?spm=1001.2014.3001.5502","https://blog.csdn.net/m0_60394896/article/details/124530993","https://blog.csdn.net/m0_60394896/article/details/124529941?spm=1001.2014.3001.5502","https://blog.csdn.net/m0_60394896/article/details/124519531","https://blog.csdn.net/m0_60394896/article/details/124508776","https://blog.csdn.net/m0_60394896/article/details/124361092","https://blog.csdn.net/m0_60394896/article/details/124094245","https://blog.csdn.net/m0_60394896/article/details/124034831","https://blog.csdn.net/m0_60394896/article/details/124033445","https://blog.csdn.net/m0_60394896/article/details/123981398","https://blog.csdn.net/m0_60394896/article/details/123772011","https://blog.csdn.net/m0_60394896/article/details/123583566","https://blog.csdn.net/m0_60394896/article/details/122505828","https://blog.csdn.net/m0_60394896/article/details/122371110"])

全部代码

import random, requests, re, logging, timefrom fake_useragent import UserAgentlogging.basicConfig(level=logging.INFO , format='%(asctime)s[%(levelname)s]: %(message)s'); logger = logging.getLogger(__name__)def getHeader():    return {'Referer': 'https://blog.csdn.net','User-Agent': UserAgent().random}def parse_html(text:str):    return int(re.search(r'([0-9]*)', text).group(1))def requestUrlText(url):    try: header = getHeader() response = requests.get(url, headers=header) if response.status_code == 200:     return parse_html(response.text)    except (requests.RequestException, AttributeError): return Falseclass request_url():    def __init__(self, url:str): self.url = url self.read = 0    def succeeded(self): #弃用 logging.debug(f"Request Url{self.url} successfully.")    def _update(self): _res = requestUrlText(self.url) if isinstance(_res, int):     self.succeeded()     _var = _res - self.read     self.read = _res     return _var return 0    def update(self): while True:     r = self._update()     if r: return r    def readnum(self) -> int: return self.readclass request_urls():    def __init__(self, *args): self.__urls = [] self.add(*args) self.all_add = 0    def add(self, *urls): for url in urls:     self.__urls.append(request_url(url))    def get(self) -> int: return sum([u.update() for u in self.__urls])    def total(self) -> int: return sum([u.readnum() for u in self.__urls])    def update(self) -> None: add = self.get() self.all_add += add logger.info(f"total {self.total()}({len(self.__urls)}blogs), ↑{add} [ALL {self.all_add}]")def run(urls):    req = request_urls(*urls)    req.get() #初始化.    while True: req.update() time.sleep(60 * 60 * 2) #自动两小时请求一次if __name__ == '__main__':    run(["https://blog.csdn.net/m0_60394896/article/details/124571653?spm=1001.2014.3001.5502","https://blog.csdn.net/m0_60394896/article/details/124530993","https://blog.csdn.net/m0_60394896/article/details/124529941?spm=1001.2014.3001.5502","https://blog.csdn.net/m0_60394896/article/details/124519531","https://blog.csdn.net/m0_60394896/article/details/124508776","https://blog.csdn.net/m0_60394896/article/details/124361092","https://blog.csdn.net/m0_60394896/article/details/124094245","https://blog.csdn.net/m0_60394896/article/details/124034831","https://blog.csdn.net/m0_60394896/article/details/124033445","https://blog.csdn.net/m0_60394896/article/details/123981398","https://blog.csdn.net/m0_60394896/article/details/123772011","https://blog.csdn.net/m0_60394896/article/details/123583566","https://blog.csdn.net/m0_60394896/article/details/122505828","https://blog.csdn.net/m0_60394896/article/details/122371110"])

什么? 刷取csdn阅读量?那是不可能
csdn主要靠这几种方式获取你是否在requests

项目 Value
1 时间戳 time.time() 时间计算是一个重要方式 如果你再过少时间内请求多次,则视为增加一次阅读量
2 请求头 header 这是大多数网站返回结果考虑的因素
3 Referer 你从哪里来?
4 Cookie + IP 单个IP或者是cookie文件 请求次数过多,也会减少阅读量,但其实这一步可以用代理IP 但效果不怎么样
5 用户信息 如(4), 单个用户只能点赞收藏一次

阅读量并不能代表什么 点赞等,综合指标才能上去所以有时间还是要想想发布一些优质的博文.

网赚站