> 文档中心 > 人人贷散标爬虫实例进阶-使用异步io

人人贷散标爬虫实例进阶-使用异步io

 写在前面:

1.该代码写于2020年4月5日,经查看,人人贷网站前端已出现变化,所以该代码无法直接运行。该代码为此版本的改善。人人贷散标爬虫实例_小zhan柯基-CSDN博客_人人贷爬虫

2.由于爬取数据量较大,达到几十万条,因此需要考虑健壮性与爬取速度。对于爬取速度,由于request库采用阻塞式访问,每分钟只可爬取200条贷款记录,而使用异步IO库,则爬取速度可以提升6~7倍,每分钟可爬取1200条左右的贷款记录。

3.虽代码无法运行,但使用aiohttp的过程仍有一定借鉴意义。

4.参考资料:异步IO - 廖雪峰的官方网站

#多进程模块#多进程也可以用进程池Pool来写from multiprocessing import Process, Queueimport time#selenium模拟登录更新cookiefrom selenium import webdriverfrom selenium.webdriver.common.keys import Keys#爬虫模块import requestsfrom bs4 import BeautifulSoupimport reimport jsonimport csv#异步ioimport asyncioimport aiohttptry:    from aiohttp import ClientErrorexcept:    from aiohttp import ClientProxyConnectionError as ProxyConnectionError# pattern = re.compile(r'.*var info = (.*?)截至借款前5年内借款人.*')pattern = re.compile(r'.*var info = (.*?)var detail = .*')def timestamp_datetime(value):    format = '%Y-%m-%d %H:%M:%S'    value = time.localtime(value)    dt = time.strftime(format, value)    return dtdef async_get_new_cookie():    print("******************正在登录并更新cookie******************")    # 无窗口浏览器参数    opt = webdriver.ChromeOptions()    opt.set_headless()    driver = webdriver.Chrome(options=opt)    # driver = webdriver.Chrome()    driver.maximize_window()    driver.get('https://www.renrendai.com/login')    # 不考虑验证码的情况    # print(driver.page_source)    driver.find_element_by_xpath('//span[@class="tab-password"]').click()  # 点击登录进入登录界面    print("******************输入手机号中******************")    driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('188****9029', Keys.TAB)  # 发送帐号名    print("******************输入密码中******************")    span_click = driver.find_element_by_xpath('//span[@id="rememberme-login"]')    driver.execute_script("arguments[0].setAttribute(arguments[1], arguments[2])", span_click, "class", 'is-choose')    driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('zzz*****!!', Keys.ENTER)  # 发送密码并回车    time.sleep(15)  # 等待cookie加载完成    cookies_get = driver.get_cookies()    # c = requests.cookies.RequestsCookieJar()    cookies = {}    for item in cookies_get: cookies[item["name"]] = item["value"]    print("******************登录完毕******************")    driver.quit()    return cookiesasync def asyncGetSingleHtmlText(url_borrower,cookies,q):    my_header = { "User-Agent":"User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.11 (KHTML, like Gecko)Ubuntu/11.10 Chromium/27.0.1453.93 Chrome/27.0.1453.93 Safari/537.36",    }    conn = aiohttp.TCPConnector(verify_ssl=False)    async with aiohttp.ClientSession(connector=conn) as session: try:     async with session.get(url_borrower, cookies=cookies,timeout=100,headers=my_header,allow_redirects=False) as result:  result_text = await result.text()  q.put(result_text) except Exception as e:     try:  async with session.get(url_borrower, cookies=cookies, headers=my_header, timeout=100,allow_redirects=False) as result:      result_text = await result.text()      q.put(result_text)     except Exception as e:  print("Exception in asyncGetSingleHtmlText()", e.args)  pass     # print(e)     # print(str(e)) # except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): #     print("Exception in asyncGetSingleHtmlText()")def asyncgetHtmlText(q,url_list_borrower):    BATCH_TEST_SIZE = 500    # cookies = async_get_new_cookie()    count = len(url_list_borrower)    cookies_update = 0    try: for i in range(0, count, BATCH_TEST_SIZE):     start = i     stop = min(i + BATCH_TEST_SIZE, count)     loop = asyncio.get_event_loop()     cookies = async_get_new_cookie()     tasks = [asyncGetSingleHtmlText(url_borrower,cookies,q) for url_borrower in url_list_borrower[start:stop]]     loop.run_until_complete(asyncio.wait(tasks))     # print("******************等待60s******************")     # time.sleep(60)    except Exception as e: print('Exception in asyncgetHtmlText', e.args)# 读数据进程执行的代码:def asyncParseAndSave(q):    while True: html_text = q.get(True) try:     bs = BeautifulSoup(html_text, "html.parser")     info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","")     infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}'     info_dict = json.loads(infoProcess)     # print(info_dict)     if "gender" not in info_dict["borrower"]:  print("gender not in borrower'key",info_dict["loan"]["loanId"])  continue     # invester_dict = html_text[1]     # temp = []     # # print(invester_dict)     # # print(type(invester_dict))     # # print(invester_dict["status"])     # if invester_dict["status"] == 0 and invester_dict["data"]["joinCount"] > 0:     #     for borrower in invester_dict["list"]:     #  temp.append([borrower["amount"], timestamp_datetime(int(borrower["lendTime"]) / 1000)])     # else:     #     temp = ""     # print(temp)     with open("Mydata.csv","a") as csvfile:  writer = csv.writer((csvfile))  writer.writerow([info_dict["loan"]["loanId"],timestamp_datetime(int(info_dict["loan"]["openTime"])/1000),info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"],     info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"],     info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"],     info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"],     info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"],     info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["successCount"],info_dict["userLoanRecord"]["alreadyPayCount"],info_dict["userLoanRecord"]["failedCount"],info_dict["loan"]["description"],     ])     print("id:{} has done".format(info_dict["loan"]["loanId"]))     csvfile.close() except Exception as e:     print("Exception in parser:",info_dict["loan"]["loanId"])     continue#定期更新cookiedef get_new_cookie(session): print("******************正在登录并更新cookie******************") #无窗口浏览器参数 opt = webdriver.ChromeOptions() opt.set_headless() driver = webdriver.Chrome(options=opt) driver.maximize_window() driver.get('https://www.renrendai.com/login') # 不考虑验证码的情况 driver.find_element_by_xpath('//span[@class="tab-password"]').click()  # 点击登录进入登录界面 print("******************输入手机号中******************") driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('yourAccountName', Keys.TAB)  # 发送帐号名 print("******************输入密码中******************") driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('yourPwd', Keys.ENTER)  # 发送密码并回车 time.sleep(15)  # 等待cookie加载完成 cookies = driver.get_cookies() c = requests.cookies.RequestsCookieJar() for item in cookies:     c.set(item["name"], item["value"]) session.cookies.update(c)  # 载入cookie print("******************登录完毕******************") driver.quit()# 写数据进程执行的代码:def getHtmlText(q,url_list):    htmlTextList = []    session = requests.Session()    get_new_cookie(session)    exception_count = 0    my_header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36",    }    for index,url in enumerate(url_list):#len(url_list): try:     res = session.get(url,timeout=10,headers=my_header)     res.raise_for_status()     res.encoding = res.apparent_encoding     htmlTextList.append(res.text)     print("request:"+str(index))     if (index+1)%250 == 0:  print(res.text)  get_new_cookie(session)     if (index+1)%10 == 0:  q.put(htmlTextList)  htmlTextList = [] except Exception as e:     print("Exception in request:",index)     exception_count += 1     #多次解析错误,可能原因是     if exception_count > 20:  print("exception_count==50")  time.sleep(60)  get_new_cookie(session)  exception_count = 0# 读数据进程执行的代码:def parseAndSave(q):    while True: html_text_list = q.get(True) for index,html_text in enumerate(html_text_list):     try:  bs = BeautifulSoup(html_text, "html.parser")  info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","")  infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}'  # print(infoProcess)  info_dict = json.loads(infoProcess)  # print(info_dict)  if "gender" not in info_dict["borrower"]:      print("gender not in borrower'key,index:",index)      continue  with open("all.csv","a") as csvfile:      writer = csv.writer((csvfile))      writer.writerow([info_dict["loan"]["loanId"],info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"],  info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"],  info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"],  info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"],  info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"],  info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["overdueCount"],info_dict["loan"]["description"],  ])  print("id:{} has done".format(info_dict["loan"]["loanId"]))  csvfile.close()     except Exception as e:  print("Exception in parser:",info_dict["loan"]["loanId"])  continueif __name__=='__main__':    print("******************begining******************")    # #0.多线程/反    #1.爬虫准备工作,提供url片段    init_url_borrower = "https://www.renrendai.com/loan-{}.html"    n = 700 * 10000 #600 * 10000  #660 650 640 630 620 610    # n2 = 620 * 10000    #     # n3 = 610 * 10000    #     # n4 = 600 * 10000    url_list1 = [init_url_borrower.format(i + n + 0000)  for i in range(25000)]    url_list2 = [init_url_borrower.format(i + n + 25000) for i in range(25000)]    url_list3 = [init_url_borrower.format(i + n + 50000) for i in range(25000)]    url_list4 = [init_url_borrower.format(i + n + 75000) for i in range(25000)]    #2.父子进程就绪    #2.1父进程创建Queue,并传给各个子进程:    q = Queue()    pw1 = Process(target=asyncgetHtmlText, args=(q, url_list1))    pw2 = Process(target=asyncgetHtmlText, args=(q, url_list2))    pw3 = Process(target=asyncgetHtmlText, args=(q, url_list3))    pw4 = Process(target=asyncgetHtmlText, args=(q, url_list4))    pr = Process(target=asyncParseAndSave, args=(q,))    #2.2启动子进程pw*,pd,    pw1.start()    # pw2.start()    # pw3.start()    # pw4.start()    pr.start()    #2.3等待pw结束即全部读取进程工作完毕,才强制中止pr进程    pw1.join()    pw2.join()    pw3.join()    pw4.join()    print("******************everything is ok,please terminate ******************")