人人贷散标爬虫实例进阶-使用异步io
写在前面:
1.该代码写于2020年4月5日,经查看,人人贷网站前端已出现变化,所以该代码无法直接运行。该代码为此版本的改善。人人贷散标爬虫实例_小zhan柯基-CSDN博客_人人贷爬虫
2.由于爬取数据量较大,达到几十万条,因此需要考虑健壮性与爬取速度。对于爬取速度,由于request库采用阻塞式访问,每分钟只可爬取200条贷款记录,而使用异步IO库,则爬取速度可以提升6~7倍,每分钟可爬取1200条左右的贷款记录。
3.虽代码无法运行,但使用aiohttp的过程仍有一定借鉴意义。
4.参考资料:异步IO - 廖雪峰的官方网站
#多进程模块#多进程也可以用进程池Pool来写from multiprocessing import Process, Queueimport time#selenium模拟登录更新cookiefrom selenium import webdriverfrom selenium.webdriver.common.keys import Keys#爬虫模块import requestsfrom bs4 import BeautifulSoupimport reimport jsonimport csv#异步ioimport asyncioimport aiohttptry: from aiohttp import ClientErrorexcept: from aiohttp import ClientProxyConnectionError as ProxyConnectionError# pattern = re.compile(r'.*var info = (.*?)截至借款前5年内借款人.*')pattern = re.compile(r'.*var info = (.*?)var detail = .*')def timestamp_datetime(value): format = '%Y-%m-%d %H:%M:%S' value = time.localtime(value) dt = time.strftime(format, value) return dtdef async_get_new_cookie(): print("******************正在登录并更新cookie******************") # 无窗口浏览器参数 opt = webdriver.ChromeOptions() opt.set_headless() driver = webdriver.Chrome(options=opt) # driver = webdriver.Chrome() driver.maximize_window() driver.get('https://www.renrendai.com/login') # 不考虑验证码的情况 # print(driver.page_source) driver.find_element_by_xpath('//span[@class="tab-password"]').click() # 点击登录进入登录界面 print("******************输入手机号中******************") driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('188****9029', Keys.TAB) # 发送帐号名 print("******************输入密码中******************") span_click = driver.find_element_by_xpath('//span[@id="rememberme-login"]') driver.execute_script("arguments[0].setAttribute(arguments[1], arguments[2])", span_click, "class", 'is-choose') driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('zzz*****!!', Keys.ENTER) # 发送密码并回车 time.sleep(15) # 等待cookie加载完成 cookies_get = driver.get_cookies() # c = requests.cookies.RequestsCookieJar() cookies = {} for item in cookies_get: cookies[item["name"]] = item["value"] print("******************登录完毕******************") driver.quit() return cookiesasync def asyncGetSingleHtmlText(url_borrower,cookies,q): my_header = { "User-Agent":"User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.11 (KHTML, like Gecko)Ubuntu/11.10 Chromium/27.0.1453.93 Chrome/27.0.1453.93 Safari/537.36", } conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession(connector=conn) as session: try: async with session.get(url_borrower, cookies=cookies,timeout=100,headers=my_header,allow_redirects=False) as result: result_text = await result.text() q.put(result_text) except Exception as e: try: async with session.get(url_borrower, cookies=cookies, headers=my_header, timeout=100,allow_redirects=False) as result: result_text = await result.text() q.put(result_text) except Exception as e: print("Exception in asyncGetSingleHtmlText()", e.args) pass # print(e) # print(str(e)) # except (ClientError, aiohttp.client_exceptions.ClientConnectorError, asyncio.TimeoutError, AttributeError): # print("Exception in asyncGetSingleHtmlText()")def asyncgetHtmlText(q,url_list_borrower): BATCH_TEST_SIZE = 500 # cookies = async_get_new_cookie() count = len(url_list_borrower) cookies_update = 0 try: for i in range(0, count, BATCH_TEST_SIZE): start = i stop = min(i + BATCH_TEST_SIZE, count) loop = asyncio.get_event_loop() cookies = async_get_new_cookie() tasks = [asyncGetSingleHtmlText(url_borrower,cookies,q) for url_borrower in url_list_borrower[start:stop]] loop.run_until_complete(asyncio.wait(tasks)) # print("******************等待60s******************") # time.sleep(60) except Exception as e: print('Exception in asyncgetHtmlText', e.args)# 读数据进程执行的代码:def asyncParseAndSave(q): while True: html_text = q.get(True) try: bs = BeautifulSoup(html_text, "html.parser") info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","") infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}' info_dict = json.loads(infoProcess) # print(info_dict) if "gender" not in info_dict["borrower"]: print("gender not in borrower'key",info_dict["loan"]["loanId"]) continue # invester_dict = html_text[1] # temp = [] # # print(invester_dict) # # print(type(invester_dict)) # # print(invester_dict["status"]) # if invester_dict["status"] == 0 and invester_dict["data"]["joinCount"] > 0: # for borrower in invester_dict["list"]: # temp.append([borrower["amount"], timestamp_datetime(int(borrower["lendTime"]) / 1000)]) # else: # temp = "" # print(temp) with open("Mydata.csv","a") as csvfile: writer = csv.writer((csvfile)) writer.writerow([info_dict["loan"]["loanId"],timestamp_datetime(int(info_dict["loan"]["openTime"])/1000),info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"], info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"], info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"], info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"], info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"], info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["successCount"],info_dict["userLoanRecord"]["alreadyPayCount"],info_dict["userLoanRecord"]["failedCount"],info_dict["loan"]["description"], ]) print("id:{} has done".format(info_dict["loan"]["loanId"])) csvfile.close() except Exception as e: print("Exception in parser:",info_dict["loan"]["loanId"]) continue#定期更新cookiedef get_new_cookie(session): print("******************正在登录并更新cookie******************") #无窗口浏览器参数 opt = webdriver.ChromeOptions() opt.set_headless() driver = webdriver.Chrome(options=opt) driver.maximize_window() driver.get('https://www.renrendai.com/login') # 不考虑验证码的情况 driver.find_element_by_xpath('//span[@class="tab-password"]').click() # 点击登录进入登录界面 print("******************输入手机号中******************") driver.find_element_by_xpath('//input[@placeholder="手机号/邮箱"]').send_keys('yourAccountName', Keys.TAB) # 发送帐号名 print("******************输入密码中******************") driver.find_element_by_xpath('//input[@name="j_password"]').send_keys('yourPwd', Keys.ENTER) # 发送密码并回车 time.sleep(15) # 等待cookie加载完成 cookies = driver.get_cookies() c = requests.cookies.RequestsCookieJar() for item in cookies: c.set(item["name"], item["value"]) session.cookies.update(c) # 载入cookie print("******************登录完毕******************") driver.quit()# 写数据进程执行的代码:def getHtmlText(q,url_list): htmlTextList = [] session = requests.Session() get_new_cookie(session) exception_count = 0 my_header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36", } for index,url in enumerate(url_list):#len(url_list): try: res = session.get(url,timeout=10,headers=my_header) res.raise_for_status() res.encoding = res.apparent_encoding htmlTextList.append(res.text) print("request:"+str(index)) if (index+1)%250 == 0: print(res.text) get_new_cookie(session) if (index+1)%10 == 0: q.put(htmlTextList) htmlTextList = [] except Exception as e: print("Exception in request:",index) exception_count += 1 #多次解析错误,可能原因是 if exception_count > 20: print("exception_count==50") time.sleep(60) get_new_cookie(session) exception_count = 0# 读数据进程执行的代码:def parseAndSave(q): while True: html_text_list = q.get(True) for index,html_text in enumerate(html_text_list): try: bs = BeautifulSoup(html_text, "html.parser") info = str(bs.find("script", {"src": "/ps/static/common/page/layout_c0258d7.js"}).next_sibling.string).replace("\n","") infoProcess = pattern.findall(info)[0].encode('utf-8').decode("utf-8").replace('\\u0022', '"').replace("\\u002D","-").replace("'","").replace("\\u005C","\\").replace(";","") #+ '"}}' # print(infoProcess) info_dict = json.loads(infoProcess) # print(info_dict) if "gender" not in info_dict["borrower"]: print("gender not in borrower'key,index:",index) continue with open("all.csv","a") as csvfile: writer = csv.writer((csvfile)) writer.writerow([info_dict["loan"]["loanId"],info_dict["loan"]["months"],info_dict["loan"]["leftMonths"],info_dict["loan"]["amount"],info_dict["userLoanRecord"]["notPayInterest"], info_dict["productRepayType"],info_dict["loan"]["borrowType"],info_dict["loan"]["interest"],info_dict["borrower"]["gender"], info_dict["borrower"]["homeTown"],info_dict["borrower"]["birthDay"],info_dict["borrower"]["graduation"],info_dict["borrower"]["office"],info_dict["borrower"]["officeDomain"],info_dict["borrower"]["officeScale"],info_dict["borrower"]["position"], info_dict["borrower"]["salary"],info_dict["borrower"]["carLoan"],info_dict["borrower"]["hasCar"],info_dict["borrower"]["marriage"], info_dict["borrower"]["houseLoan"],info_dict["borrower"]["hasHouse"],info_dict["borrower"]["creditLevel"], info_dict["loan"]["overDued"],info_dict["userLoanRecord"]["totalCount"],info_dict["userLoanRecord"]["overdueCount"],info_dict["loan"]["description"], ]) print("id:{} has done".format(info_dict["loan"]["loanId"])) csvfile.close() except Exception as e: print("Exception in parser:",info_dict["loan"]["loanId"]) continueif __name__=='__main__': print("******************begining******************") # #0.多线程/反 #1.爬虫准备工作,提供url片段 init_url_borrower = "https://www.renrendai.com/loan-{}.html" n = 700 * 10000 #600 * 10000 #660 650 640 630 620 610 # n2 = 620 * 10000 # # n3 = 610 * 10000 # # n4 = 600 * 10000 url_list1 = [init_url_borrower.format(i + n + 0000) for i in range(25000)] url_list2 = [init_url_borrower.format(i + n + 25000) for i in range(25000)] url_list3 = [init_url_borrower.format(i + n + 50000) for i in range(25000)] url_list4 = [init_url_borrower.format(i + n + 75000) for i in range(25000)] #2.父子进程就绪 #2.1父进程创建Queue,并传给各个子进程: q = Queue() pw1 = Process(target=asyncgetHtmlText, args=(q, url_list1)) pw2 = Process(target=asyncgetHtmlText, args=(q, url_list2)) pw3 = Process(target=asyncgetHtmlText, args=(q, url_list3)) pw4 = Process(target=asyncgetHtmlText, args=(q, url_list4)) pr = Process(target=asyncParseAndSave, args=(q,)) #2.2启动子进程pw*,pd, pw1.start() # pw2.start() # pw3.start() # pw4.start() pr.start() #2.3等待pw结束即全部读取进程工作完毕,才强制中止pr进程 pw1.join() pw2.join() pw3.join() pw4.join() print("******************everything is ok,please terminate ******************")