1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
|
''' python 3.5.2 '''
import time import requests, re, random, os from bs4 import BeautifulSoup
def ip_test(ip, url_for_test='https://www.baidu.com', set_timeout=10): ''' 检测爬取到的ip地址可否使用,能使用返回True,否则返回False,默认去访问百度测试代理 :param ip: :param url_for_test: :param set_timeout: :return: ''' try: r = requests.get(url_for_test, headers=headers, proxies={'http': ip[0]+':'+ip[1]}, timeout=set_timeout) if r.status_code == 200: return True else: return False except: return False
def scrawl_ip(url, num, url_for_test='https://www.baidu.com'): ''' 爬取代理ip地址,代理的url是西祠代理 :param url: :param num: :param url_for_test: :return: ''' ip_list = [] for num_page in range(1, num+1): url = url + str(num_page)
response = requests.get(url, headers=headers) response.encoding = 'utf-8' content = response.text
pattern = re.compile('<td class="country">.*?alt="Cn" />.*?</td>.*?<td>(.*?)</td>.*?<td>(.*?)</td>', re.S) items = re.findall(pattern, content) for ip in items: if ip_test(ip[1], url_for_test): print('测试通过,IP地址为' + str(ip[0]) + ':' + str(ip[1])) ip_list.append(ip[0]+':'+ip[1]) return ip_list
time.sleep(5)
def get_random_ip(): ind = random.randint(0, len(total_ip)-1) return total_ip[ind]
def download_img(img_list, img_title): ''' 通过scrawl_url函数获得了单个图册里面所有图片的url列表和图册的名字,就可以下载图片了 此函数的作用下载单个图册里面的所有图片 接收参数img_list是单个图册里面所有图片的的url, 如['http://mm.howkuai.com/wp-content/uploads/2017a/02/07/01.jpg', 'http://mm.howkuai.com/wp-content/uploads/2017a/02/07/02.jpg',...] img_title是单个图册的名字,如’香车美女,最完美的黄金搭档‘ :param img_list: :param img_title: :return: '''
img_title = format_name(img_title) for img_urls in img_list: img_url = img_urls.attrs['src'] print(img_url) title = img_urls.attrs['alt'] print(title)
try: if not os.path.exists(os.path.join(file_path, img_title)): os.makedirs(os.path.join(file_path, img_title)) os.chdir(file_path + '\\' + img_title)
exists = os.path.exists(img_title) if not exists: try: img_html = requests.get(img_url, headers=headers, stream=True, timeout=20, verify=True) with open(title+".jpg", 'wb') as f: f.write(img_html.content) f.close() except: continue except: continue
def scrawl_list(url_list, proxy_flag=False, try_time=0): ''' 此函数的作用是爬取每一页面所有图册的url,一个页面包含10个图册,所有调用一次函数则返回一个包含10个url的列表 格式如['http://www.meizitu.com/a/list_1_1.html',...] :param url_list: :param proxy_flag: :param try_time: :return: ''' if not proxy_flag: try: html = requests.get(url_list, headers=headers, timeout=10) html.encoding = 'gb2312' text = html.text
bsop = BeautifulSoup(text, 'html.parser')
url_imgs = [] li_list = bsop.find('ul', {'class': 'wp-list clearfix'}).findAll('li', {'class':'wp-item'}) for i in li_list: url_img = i.find('h3',{'class':'tit'}).find('a').attrs['href'] url_imgs.append(url_img) return url_imgs except: return scrawl_list(url_list, proxy_flag=True) else: if try_time<count_time: try: print('尝试第'+str(try_time+1)+'次使用代理下载') html = requests.get(url_list, headers=headers, proxies={'http': get_random_ip()}, timeout=10) html.encoding = 'gb2312' text = html.text
bsop = BeautifulSoup(text, 'html.parser')
url_imgs = [] li_list = bsop.find('ul', {'class': 'wp-list clearfix'}).findAll('li', {'class': 'wp-item'}) for i in li_list: url_img = i.find('h3', {'class': 'tit'}).find('a').attrs['href'] url_imgs.append(url_img) print('状态码为'+str(html.status_code)) if html.status_code==200: print('url_imgs通过IP代理处理成功!') return url_imgs else: return scrawl_list(url_list, proxy_flag=True, try_time=(try_time + 1)) except: print('url_imgs代理下载失败,尝试下次代理') return scrawl_list(url_list, proxy_flag=True, try_time=(try_time+1)) else: print('url_imgs爬取失败,请检查网页') return None
def scrawl_url(url, proxy_flag=False, try_time=0): ''' 此函数的作用是爬取单个图册里面的所有图片的url,一个图册包含几张图片,每个图片有个真实的url地址,需要获取得到 此函数接收图册url作为参数,如'http://www.meizitu.com/a/5499.html',返回该图册里面所有图片的url列表和图册的名字 所有图片共用一个名字,可作为文件夹名字存储 :param url: :param proxy_flag: :param try_time: :return: ''' if not proxy_flag: try: html = requests.get(url, headers=headers, timeout=10) html.encoding = 'gb2312' text = html.text
bsop = BeautifulSoup(text, 'html.parser') img_list = bsop.find('div', {'class': 'postContent'}).find('p').findAll('img') img_title = bsop.find('div', {'class': 'metaRight'}).find('h2').find('a').text
return img_list, img_title
except: return scrawl_url(url, proxy_flag=True) else: if try_time<count_time: try: print('尝试第'+str(try_time+1)+'次使用代理下载')
html = requests.get(url, headers=headers, proxies={'http': get_random_ip()},timeout=30) html.encoding = 'gb2312'
text = html.text bsop = BeautifulSoup(text, 'html.parser') img_list = bsop.find('div', {'class': 'postContent'}).find('p').findAll('img') img_title = bsop.find('div', {'class': 'metaRight'}).find('h2').find('a').text
print('状态码为'+str(html.status_code)) if html.status_code==200: print('图片通过IP代理处理成功!') return img_list, img_title else: return scrawl_url(url, proxy_flag=True, try_time=(try_time + 1)) except: print('IP代理下载失败') return scrawl_url(url, proxy_flag=True, try_time=(try_time+1)) else: print('图片url列表未能爬取,请检查网页') return None
def download_urls(pages): ''' 此函数的作用是爬取所有页面的url,最后返回的是包含所有页面url的二位列表,格式如下 url_imgss = [ ['http://www.meizitu.com/a/list_1_1.html',...], ['http://www.meizitu.com/a/list_1_2.html',...], ... ] ''' url_imgss = [] for i in range(1, pages+1): try: url_list = 'http://www.meizitu.com/a/list_1_' + str(i) + '.html' url_imgs = scrawl_list(url_list) if not url_imgs: continue url_imgss.append(url_imgs) print("第"+str(i)+"页url爬取成功") time.sleep(5) except: continue return url_imgss
def format_name(img_title): ''' 对名字进行处理,如果包含下属字符,则直接剔除该字符 :param img_title: :return: ''' for i in ['\\','/',':','*','?','"','<','>','!','|']: while i in img_title: img_title = img_title.strip().replace(i, '') return img_title
def get_total_pages(first_url): ''' 获取妹子图所有页面 :param first_url: :return: ''' html = requests.get(first_url, headers=headers, timeout=10) html.encoding = 'gb2312' text = html.text bsop = BeautifulSoup(text, 'html.parser') lis =bsop.find('div',{'id':'wp_page_numbers'}).find('ul').findAll('li') pages = lis[-1].find('a').attrs['href'].split('.')[0].split('_')[-1] pages = int(pages) return pages
first_url = 'http://www.meizitu.com/a/list_1_1.html'
url_ip = "http://www.xicidaili.com/nt/"
set_timeout = 10
num = 2
count_time = 5
UserAgent_List = [ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1", "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5", "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3", "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24", "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24" ] headers = {'User-Agent': random.choice(UserAgent_List), 'Accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", 'Accept-Encoding': 'gzip', }
file_path = 'E:\selfprogress\programming\project\meizitu'
pages = get_total_pages(first_url)
total_ip = scrawl_ip(url_ip, num)
url_imgss = download_urls(pages)
for i in url_imgss: for j in i: try: with open('url.txt','a') as f: f.write(j+"\n") f.close() print("写入url.txt文件成功") except: print("写入url.txt文件失败")
for url_imgs in url_imgss: for url_img in url_imgs: img_list, img_title = scrawl_url(url_img) if not img_list: continue download_img(img_list, img_title)
time.sleep(5)
|