python代码实现yunfile文件批量自动化下载
作者:互联网
这个源码我是先发表在52pojie了,(*^▽^*),如有侵权,请联系删
--->说明: 使用python2.7+selenium+chrome v49+百度ocr识别,基本上pip install 相关的程序,源码就可以直接运行了,最新chrome版本的话,打开网页太卡了。
网盘的(30)秒验证码限制和(10)分钟等待还是存在的。好处就是url很多的时候,丢那边就不用管了, 需要在源码所在目录,下图所示,创建file.txt 、recode.txt 、config.txt
把url全部(一行一个url)放在file.txt中,程序开始则对比recode.txt,对比file.txt文件的url链接是否在recode.txt中,recode中不存在此url则保存到newfile.txt。最终程序打开url使用的是newfile中的url。
config.txt中是定义百度OCR识别的id等,file.txt、recode,txt 、(down_path)是chrome的下载路径(由于不知道怎么自动获取,所以一定要手动填,要不然判断文件是否存在错误,会等待很长时间)。。。。
appId =kge369
apiKey =kge369
secretKey =kge369
pic_path =.\1.png
urlfile =.\file.txt
down_path =C:\Users\Administrator\Downloads\
recodefile =.\recode.txt
#-*- coding:utf-8 -*- from __future__ import print_function import time from datetime import datetime from selenium import webdriver from selenium.webdriver.common.by import By from PIL import Image,ImageEnhance import os import re from aip import AipOcr import winsound import sys ################################################################### ###################百度OCR识别##################################### ''' config = { 'appId': 'kge369', 'apiKey': 'kge369', 'secretKey': 'kge369' } #client = AipOcr(**config) ''' def get_file_content(file): try: with open(file,'rb') as fp: return fp.read() except: print ('dakai yanzhengma tupian fail') reset_page(driver) time.sleep(10) code_ver(driver) def img_to_str(image_path): try: image = get_file_content(image_path) result = client.basicGeneral(image) except: global baidu print (u'百度不通,刷新页面,错误次数'+ baidu) if baidu < 5: baidu += 1 else: sys.exit() return baidu reset_page(driver) time.sleep(10) code_ver(driver) for i in result.get('words_result'): return (i.get('words')) ################################################################### #########################读取玉预配置文件#################################### def getfile(): fp = open(r'.\config.txt','r') lines = fp.readlines() list = {} for line in lines: line = line.replace(' ', '') line = line.strip('\n') ss = line.split('=') list[ss[0]] = ss[1] #print (result) #key[name] = list[ss[1]] #return key[name] fp.close() return list def get(name): ss = getfile().get(name) return ss ################判断是否为数字################################ def is_number(s): try: float(s) return True except ValueError: pass try: import unicodedata unicodedata.numeric(s) return True except (TypeError, ValueError): pass return False #########################用不上了##################################### #定义一个函数,带有4个参数 #x 表示要更新的文件名称 #y 表示要被替换的内容 #z 表示 替换后的内容 #s 默认参数为 1 表示只替换第一个匹配到的字符串 # 如果参数为 s = 'g' 则表示全文替换 def string_switch(x,y,z,s=1): with open(x, "r") as f_r: #readlines以列表的形式将文件读出 lines = f_r.readlines() with open(x, "w") as f_w: #定义一个数字,用来记录在读取文件时在列表中的位置 n = 0 #默认选项,只替换第一次匹配到的行中的字符串 if s == 1: for line in lines: if y in line: line = line.replace(y,z) f_w.write(line) n += 1 break f_w.write(line) n += 1 #将剩余的文本内容继续输出 for i in range(n,len(lines)): f_w.write(lines[i]) #全局匹配替换 elif s == 'g': for line in lines: if y in line: line = line.replace(y,z) f_w.write(line) #########################用不上了#################################################### ###对比file.txt文件的url和recode记录,如果未下载则=于 -1,则记录到新文件 ###使用新文件的url来下载即可。不用每次都判断 def check_down_file(): try: print(u"检测file的url是否已下载") with open(urlfile,'a+') as check_url: with open("newfile.txt",'w+') as write: write.write("") fileurls = check_url.readlines() for fileurl in fileurls: fileurl = fileurl.strip('\n') with open(recodefile,'a+') as read: if read.read().find(fileurl) == -1 and fileurl.find("fs") > 0: with open("newfile.txt",'a+') as write_new: write_new.write(fileurl + '\n') print(fileurl) except: print(u"呵呵,程序错误了吧!!!联系kge369编程的这个家伙吧") pass ############################################################################## def voice(): duration = 5000 # millisecond freq = 1600 #hz winsound.Beep(freq,duration) # def yanshi(sj): time_start = datetime.now() t = "|/-\\" for i in range(0,sj + 1): time_now = datetime.now() print (u'等待时间剩余 : ' + str(sj - (time_now - time_start).seconds),end="\r") time.sleep(1) print (' ',end="\r") # def is_element_visible(driver,element): try: driver.find_element_by_class_name(element) flag = True except: flag = False return flag # def is_element_id(driver,element): try: driver.find_element_by_id(element) flag = True except: flag = False return flag # def is_num(num): try: int(num) return True except ValueError: #print "%s ValueError" % num return False #### def reset_page(driver): with open(recodefile,'a+') as ReP: if ReP.read().find(url) > -1: pass else: try: driver.refresh() print(u'刷新页面') except: print(u'时间超时,停止刷新页面') driver.execute_script("window.stop()") pass #### def get_times(driver): ###检测平均时间是否为10分钟,超过10分钟则认定文件过大 try: print(u"检测平均时间id") driver.find_element_by_id("pingjun_downtime_str1") except: print(u"平均时间id未找到") reset_page(driver) yanshi(10) code_ver(driver) else: with open(recodefile,'a+') as gt_file: if gt_file.read().find(url) != -1: pass else: gettimes = driver.find_element_by_id("pingjun_downtime_str1").text gettimes = re.sub("\D",'',gettimes) print(u'平均下载时间为: ' + gettimes) if gettimes == "10": get_auth_code(driver) else: print (u'文件下载平均超过10分钟,请手动下载') def code_ver(driver): try: print (u'检测广告阶段') driver.find_element_by_class_name("ad-close-btn") driver.find_element_by_class_name("ad-close-btn").click() except: with open(recodefile,'a+') as code_file: if code_file.read().find(url) != -1: print(u"检测链接是否已下载,下载则pass") pass else: try: len(driver.find_element_by_class_name("message_title").text) > 3 except: pass else: if len(driver.find_element_by_class_name("message_title").text) > 3: print(u"资源链接不存在,出错了!") pass else: print(u"检测不到yunfile的广告,刷新") reset_page(driver) yanshi(10) code_ver(driver) else: with open(recodefile,'a+') as code2_file: if code2_file.read().find(url) != -1: print(u"检测链接是否已下载,下载则pass") pass else: print (u'广告存在,关闭广告,等待2秒进入下载平均时间判断') time.sleep(2) get_times(driver) def down_ver(driver): print (u'下载页面广告检测阶段') s = is_element_visible(driver,"ad-close-btn") #print (s) if s: #time.sleep(1) try: driver.find_element_by_class_name("ad-close-btn").click() except: print(u"点击广告失败,重新刷新页面") reset_page(driver) yanshi(10) code_ver(driver) print (u'关闭广告后进去下载界面') downfile(driver) else: time_start = datetime.now() while not s: recheck_s = is_element_visible(driver,"ad-close-btn") time_now = datetime.now() if recheck_s: print (u'关闭广告后进去下载界面2') try: driver.find_element_by_class_name("ad-close-btn").click() except: print(u"点击广告失败,重新刷新页面") time.sleep(1) reset_page(driver) yanshi(10) code_ver(driver) print (u'关闭广告后进去下载界面2') downfile(driver) break elif(time_now - time_start).seconds > 20: print ("timer out! turunto get_auth_code!!!") get_auth_code(driver) break else: continue def downfile(driver): print (u'进入下载页面') try: driver.find_element_by_id("downbtn") time.sleep(1) driver.find_element_by_id("downbtn").click() except: print (u'下载链接未找到') ###判断是不是显示11分钟,是的话说明不能继续下载了,退出程序### try: s_d = driver.find_element_by_id("common_speed_down").text p_d = re.sub("\D",'',s_d) print (p_d) if int(p_d) == 11: sys.exit() else: reset_page(driver) time.sleep(10) code_ver(driver) except: reset_page(driver) time.sleep(10) code_ver(driver) else: print (u'下载链接存在,准备下载...等待10秒检测文件是否存在') time.sleep(10) path = down + title if os.path.isfile(path): time_now1 = datetime.now() with open(recodefile,'a+') as f_recode: f_recode.write(str(time_now1) + ': ' + url + title + '\n') print (u'下载文件成功1') yanshi(610) else: time_start = datetime.now() while not os.path.isfile(path): time_now = datetime.now() path2 = down + title if os.path.isfile(path2): print (u"超过10秒,继续检测文件是否存在") #recodefile = 'recode.txt' with open(recodefile,'a+') as f_recode2: f_recode2.write(str(time_now) + ': ' + url + title + '\n') print (u'下载文件成功2') string_switch(faildown,url + title,url + "haved down","g") False break elif(time_now - time_start).seconds > 640: print (u'超过10分钟下载文件失败') break else: print (u'下载文件用时: '+ str((time_now - time_start).seconds) + u"秒",end="\r") time.sleep(1) print (' ',end="\r") try: if len(driver.find_element_by_class_name("message_title").text) > 3: print(u"资源链接不存在,出错了!") False break except: pass def get_auth_code(driver): '''获取验证码''' while True: print (u'获取验证码阶段') #driver.find_element_by_class_name("ad-close-btn").click() #title = driver.title if len(title) > 0: print (title) time.sleep(1) try: driver.find_element_by_id("common_speed_down") except: print(u'没找到普通下载按钮,刷新页面') #driver.refresh() reset_page(driver) time.sleep(10) code_ver(driver) break else: s = driver.find_element_by_id("common_speed_down").text p = re.sub("\D",'',s) print (s) if is_num(p) == False: print (is_num(p)) #False else: print ('dengdai') #driver.refresh() reset_page(driver) timer = int(p)*60 + 59 yanshi(timer) code_ver(driver) break driver.find_element_by_id("common_speed_down").click() time.sleep(3) image_view(driver) break def image_view(driver): try: driver.save_screenshot(pic_path) except: print (u'截图保存失败,刷新页面') reset_page(driver) yanshi(10) code_ver(driver) else: #dingwei yanzhengma weizhi time.sleep(1) location = driver.find_element_by_id('cvimg2').location size = driver.find_element_by_id('cvimg2').size left = location['x'] top = location['y'] right = location['x'] + size['width'] bottom = location['y']+ size['height'] try: #从保存的截图中截取验证码,再次保存 img = Image.open(pic_path).crop((left,top,right,bottom)) img.save(pic_path) except: print (u'保存验证码图片失败,刷新页面') reset_page(driver) yanshi(10) code_ver(driver) left = '' top = '' right = '' bottom = '' time.sleep(2) #cichu shi pytes wenzi shibie...import yi # #aaa = pytesseract.image_to_string(Image.open(pic_path),lang='fontyp') try: #img_to_str是到百度ocr读取 aaa = img_to_str(pic_path) print(aaa , 'yanzhengma') if len(aaa) == 4 and is_number(aaa) == True: print (u'识别的验证码为4位数字') except: print ('None fail') reset_page(driver) yanshi(10) code_ver(driver) else: if len(aaa) != 4 or is_number(aaa) == False: #验证码错误,重新刷新页面 print (u'验证码错误,刷新页面') reset_page(driver) yanshi(10) code_ver(driver) try: driver.find_element_by_id('vcode') except: reset_page(driver) time.sleep(10) code_ver(driver) else: code = driver.find_element_by_id('vcode') code.clear() code.send_keys(aaa) time.sleep(1) driver.find_element_by_id("slow_button").click() yanshi(30) down_ver(driver) time.sleep(1) if __name__ == '__main__': #pic_path = r'.\1.png' pic_path = get('pic_path') urlfile = get('urlfile') appId = get('appId') apiKey = get('apiKey') secretKey = get('secretKey') client = AipOcr(appId,apiKey,secretKey) down = get('down_path') recodefile = get('recodefile') new = "newfile.txt" print(pic_path + '--->pic file path') print(urlfile + '--->url file path') print(down + '--->down path') print(recodefile + '--->suessful down file recode') check_down_file() try: driver = webdriver.Chrome() driver.set_page_load_timeout(30) driver.set_script_timeout(30) f1 = file(new) #while True: urls = f1.readlines() for url in urls: url = url.strip('\n') print(url) baidu = 1 try: driver.get(str(url)) title = driver.title except: print(u'加载页面不完整,停止加载,记录在faildown中') driver.execute_script("window.stop()") else: print(u'判断recode文件是否已经存在该链接') with open(recodefile,'a+') as db_openurl: if db_openurl.read().find(url) == -1: code_ver(driver) else: print(u'该网页已下载过,跳过该网页') if len(urls) == 0: print(u"链接已全部打开完毕") #break finally: f1.close() print ('Done') voice() ####大致流程main--》code-ver--》get_times--》get_auth_code--》image_view--》down_ver--》downfile #####
源码中需要pip下面几个
pip install pillow
pip install selenium
pip install baidu-aip
编译后exe下载链接:大家还是尽量用源码吧,成品要用chrome 49版本来运行
链接: https://pan.baidu.com/s/1J8NZllZu4FEE3q4vY4f45Q 提取码: wzgn
标签:code,python,yunfile,driver,element,time,print,文件批量,find 来源: https://www.cnblogs.com/kge369/p/11022187.html