数据采集:selenium 自动翻页接口调用时的验证码处理

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》

写在前面


  • 工作中遇到,简单整理
  • 理解不足小伙伴帮忙指正

对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》


数据采集的过程中,有部分页面会在接口调用到一定次数之后,每次获取数据调用接口之后,弹出一个验证码的校验,作为一种反爬措施,对于这种接口调用验证码,一般情况下,而且是随机的,比如页面中有好多搜索框,可能每个搜索框的change 事件都会发生一次接口调用,这个时候使用 selenium 自动化提提取数据,会导致处理的页面不是想要的的页面,所以对于这种验证码的处理,我们需要在页面任意位置,提供一个检测跳转验证码的方法,同时对验证码做校验处理。

下面为一个 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def cap(driver):
"""
@Time : 2023/08/29 03:38:33
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 验证码处理
Args:

Returns:
void
"""
import ddddocr
ocr = ddddocr.DdddOcr()
time.sleep(3)
while len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
element = driver.find_element(By.XPATH, "//img[ @id='vcodeimg' ]")
# 清空验证码数据
driver.execute_script("arguments[0].value = ''", element)


#定位元素并获取截图文件:
element.screenshot("element.png")
with open("element.png", "rb") as f:
image_bytes = f.read()

#image_bytes = BytesIO(base64.b64decode(screenshot))
text = ocr.classification(image_bytes)
if len(text) >4:
text = text[1:5]
driver.find_element(By.XPATH, "//input[@id='vcode']").send_keys(text)
time.sleep(3)
driver.find_element(By.XPATH, "//input[@class='isOK']").click()
time.sleep(3)
if len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
driver.get("https://icp.chinaz.com/captcha")


在实际的编写中需要注意的地方:

  • 获取验证码图片的方式,是通过对元素截图,还是对照片路径请求下载获取,需要注意有些验证码图片,在通过 requests 库下载图片时,每次调用都是不同的图片,所以只能使用截图的方式
  • 验证码识别的方式,可以考虑使用 ocr或者深度学习模型,或者一些商业接口,上面使用的 pip install ddddocr
  • 对于识别不准的情况,可以考虑做一些后期的约束处理,比如上面的验证码,4位数字,但是在第一位会出现一个干扰字符,ocr 偶尔会识别为字符,需要做切割处理。
  • 进行识别的时机,以及识别后的处理,对于如何开始识别,可以通过关键字来进行判断,放到入口处,对于识别后验证失败的处理也需要考虑,上面的页面在识别验证成功会进行跳转,错了不发生跳转
  • 对于错误的情况,可以使用死循环的,重新请求,获取新的验证码,直到识别验证成功。

下面为一个数据采集的实际脚本中的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@File : icp_reptile.py
@Time : 2023/08/23 23:07:46
@Author : Li Ruilong
@Version : 1.0
@Contact : liruilonger@gmail.com
@Desc : 验证码版本
"""

# here put the import lib


from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import re
import pandas as pd
import csv
import sys
import os
import json
import requests
import ddddocr
from io import BytesIO
import base64
import pytesseract
from PIL import Image


a_name = ['河北']
ocr = ddddocr.DdddOcr()


"""
自动登陆,需要提前保存登陆cookie 信息
"""
driver = webdriver.Chrome()
with open('C:\\Users\山河已无恙\\Documents\GitHub\\reptile_demo\\demo\\cookie_vip.json', 'r', encoding='u8') as f:
cookies = json.load(f)

driver.get('https://icp.chinaz.com/provinces')
for cookie in cookies:
driver.add_cookie(cookie)

driver.get('https://icp.chinaz.com/provinces')

wait = WebDriverWait(driver, 30)


## 查询条件准备

"""
查询条件准备
"""

#wait.until(EC.presence_of_element_located((By.XPATH, "//span[ @title='chinaz_7052291' ]")))


def cap(driver):
"""
@Time : 2023/08/29 03:38:33
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 验证码处理
Args:

Returns:
void
"""
time.sleep(3)
while len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
element = driver.find_element(By.XPATH, "//img[ @id='vcodeimg' ]")
# 清空验证码数据
driver.execute_script("arguments[0].value = ''", element)


#定位元素并获取截图文件:
element.screenshot("element.png")
with open("element.png", "rb") as f:
image_bytes = f.read()

#image_bytes = BytesIO(base64.b64decode(screenshot))
text = ocr.classification(image_bytes)
if len(text) >4:
text = text[1:5]
driver.find_element(By.XPATH, "//input[@id='vcode']").send_keys(text)
time.sleep(3)
driver.find_element(By.XPATH, "//input[@class='isOK']").click()
time.sleep(3)
if len(driver.find_elements(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )) > 0:
driver.get("https://icp.chinaz.com/captcha")



time.sleep(5)
# 触发验证码处理
all_butt_cap = driver.find_element(By.XPATH,"//h1[contains(text(),'输入验证码刷新') ] " )

# 处理验证码的情况

cap(driver)

time.sleep(5)

### 查询条件准备

# 备案时间
all_butt = driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @class='pr10' ] " )
driver.execute_script("arguments[0].click();", all_butt)
cap(driver)
# 单位性质
all_butt = driver.find_element(By.XPATH,"//div[contains(text(),'全部') and @class='MainCateW-cont SearChoese'] " )
driver.execute_script("arguments[0].click();", all_butt)
time.sleep(3)
cap(driver)
# 企业
all_butt = driver.find_element(By.XPATH,"//a[contains(text(),'企业') and @val='企业' ]" )
driver.execute_script("arguments[0].click();", all_butt)
time.sleep(2)

cap(driver)
# 状态
all_butt = driver.find_element(By.XPATH,"//div[contains(text(),'全部') and @id='webStatus_txt' and @class='MainCateW-cont SearChoese w90'] " )

driver.execute_script("arguments[0].click();", all_butt)
time.sleep(2)
# 已开通
all_butt = driver.find_element(By.XPATH,"//a[contains(text(),'已开通') and @val='1' ]" )

driver.execute_script("arguments[0].click();", all_butt)
time.sleep(2)
cap(driver)

# 地区
all_butt = driver.find_element(By.XPATH,"//strong[contains(text(),'地区:') and @class='CateTit' ]" )
next_element = all_butt.find_element(By.XPATH,"following-sibling::*[1]")
driver.execute_script("arguments[0].click();", next_element)
time.sleep(2)



def area(p_name,driver,p_data):
"""
@Time : 2023/08/24 04:24:50
@Author : liruilonger@gmail.com
@Version : 1.0
@Desc : 备案数据获取
Args:

Returns:
void
"""


all_butt = driver.find_element(By.XPATH,"//a[contains(text(),'"+p_name+"') ]" )
driver.execute_script("arguments[0].click();", all_butt)
#all_butt.click()
time.sleep(2)

# 页数太对分盟市处理,后面的数据没办法直接处理
all_butt = driver.find_element(By.XPATH,"//div[contains(text(),'全部') and @id='addrctxt' ]" )
time.sleep(2)
driver.execute_script("arguments[0].click();", all_butt)
all_ui = driver.find_element(By.XPATH,"//ul[ @id='addrclst'] " )
citys = all_ui.find_elements(By.TAG_NAME,'a')
for city in citys:
try:
c_n =city.text
except:
print("页面异常")
continue

if c_n == '全部':
continue
print("处理市:",c_n)
time.sleep(2)
driver.execute_script("arguments[0].click();", city)
time.sleep(5)
# 验证码处理
cap(driver)

time.sleep(5)
# 查询
cap(driver)
# 选择全部
all_butt = driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @val='all' ] " )
driver.execute_script("arguments[0].click();", all_butt)
time.sleep(5)
cap(driver)
#all_butt = driver.find_element(By.XPATH,"//input[ @type='button' and @id='btn_search' and @value='点击搜索'] " )
#driver.execute_script("arguments[0].click();", all_butt)
#time.sleep(4)
#cap(driver)
time.sleep(10)
def all_break(driver):
if len(driver.find_elements(By.XPATH,"//span[contains(text(),'页,到第') and @class='col-gray02'] " )) == 0:
all_butt = driver.find_element(By.XPATH,"//div/a[contains(@href,'all') and @val='all' ] " )
driver.execute_script("arguments[0].click();", all_butt)
time.sleep(5)
cap(driver)
time.sleep(10)
#all_break(driver)
# 总页数获取
page_butt = driver.find_element(By.XPATH,"//span[contains(text(),'页,到第') and @class='col-gray02'] " )

page_c = int(re.search(r'\d+', page_butt.text).group())
# 盟市页数太多分时间段处理

# 当前页数据处理
tbody = driver.find_element(By.XPATH,"//tbody[ @class='result_table' and @id='result_table' ]")
rows = tbody.find_elements(By.TAG_NAME,'tr')
for row in rows:
# 获取当前行中的所有单元格
cells = row.find_elements(By.TAG_NAME, "td")
# 打印单元格数据
data = {}
data['域名']=cells[0].text
data['主办单位名称']=cells[1].text
data['网站首页网址']=cells[5].text
p_data.append(data)

# 其他页数据处理
if page_c >= 101:
page_c = 101
for page_i in range(1,page_c):
try:
print(f"{c_n} :处理页数",page_i)
nextPage = driver.find_element(By.XPATH,"//a[ @title='下一页' and @id='nextPage' ]")
driver.execute_script("arguments[0].click();", nextPage)
time.sleep(3)
cap(driver)
all_break(driver)
cap(driver)
#nextPage.click()
time.sleep(6)
tbody = driver.find_element(By.XPATH,"//tbody[ @class='result_table' and @id='result_table' ]")
rows = tbody.find_elements(By.TAG_NAME,'tr')
print(tbody.text)
for row in rows:
# 获取当前行中的所有单元格
cells = row.find_elements(By.TAG_NAME, "td")
# 打印单元格数据
data = {}
data['域名']=cells[0].text
data['主办单位名称']=cells[1].text
data['网站首页网址']=cells[5].text
p_data.append(data)
time.sleep(6)

except:
print(f"第 { page_i} 页发生了异常,跳过了")
pass
finally:
fieldnames = ['域名', '主办单位名称', '网站首页网址']
with open('省份_'+a+'_'+"ICP"+'.csv', 'w', newline='',encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader() # 写入列名
writer.writerows(p_data) # 写入字典数据

print("数据已保存为CSV文件",' CDN_M_省份_'+a+'_'+'ICP'+'.csv')
return p_data

if __name__ == '__main__':
for a in a_name:
p_data= []
try:
p_data = area(a,driver,p_data)
except:
continue
pass
finally:
fieldnames = ['域名', '主办单位名称', '网站首页网址']
with open('省份_'+a+'_'+"ICP"+'.csv', 'w', newline='',encoding='utf-8') as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader() # 写入列名
writer.writerows(p_data) # 写入字典数据
print("数据已保存为CSV文件",' CDN_M_省份_'+a+'_'+'ICP'+'.csv')

time.sleep(55555)


博文部分内容参考

© 文中涉及参考链接内容版权归原作者所有,如有侵权请告知,这是一个开源项目,如果你认可它,不要吝啬星星哦 :)



© 2018-至今 liruilonger@gmail.com, All rights reserved. 保持署名-非商用-相同方式共享(CC BY-NC-SA 4.0)

发布于

2023-09-07

更新于

2024-11-22

许可协议

评论
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×