91家纺网本地版,没有odps代码模块,只有redis和mysql,数据库配置需要参考无odps版本的数据库配置
作者:互联网
import re
from threading import Thread
import threading
from urllib import parse
from datetime import datetime
import random import requests import time
from scrapy import Selector from models import *
store_list_urls = [] product_list_urls = []
domain = "http://www.91jf.com/" store_domain = "http://www.91jf.com/default.php?act=corp&sort=list&page=" store_url_domain = 'http://www.91jf.com/default.php?act=store_goods&storeid=' # 用于拼接商户id和url category_url = "http://www.91jf.com/default.php?act=categorygoodslist&category_id=" # 用来拼接商品的url stor_url_aptitude = 'http://www.91jf.com/default.php?act=corpcert&id=' # 用于拼接商户资质的url
# 获取一级目录数据,保存商品系列ID,用来拼接爬虫入口的url def process_nodes_list(url): menu_text = requests.get(url).text sel = Selector(text=menu_text) nodes_list = sel.xpath("//div[@class='index_g_class']/ul/li") for item in nodes_list: title = item.xpath("./div[@class='class_menu']/span/text()").extract() title = ''.join(title) #主目录的名称 catalogue_name = title catalogue = Catalogue() catalogue.catalogue_name = catalogue_name # 系列名称 catalogue.series_level = 0 # 系列等级 catalogue_id_0 = 0 # 系列catalogue_id catalogue.category_id = catalogue_id_0 # 系列catalogue_id existed_id = Catalogue.select().where((Catalogue.catalogue_name==catalogue_name) & (Catalogue.category_id == catalogue_id_0)) if existed_id: #catalogue.save() pass else: catalogue.save(force_insert=True) print("插入商品目录成功")
_id = Catalogue.get(Catalogue.catalogue_name==title)._id # 此处获取父节点的id series_names = item.xpath('.//div[@class="class_child_li"]//li') for series_name in series_names: catalogue_0 = Catalogue() catalogue_0.catalogue_name = title # 系列名称 catalogue_0.series_level = 0 # 系列等级
series_name_0 = series_name.xpath('.//span/text()').extract() series_name_0 = ''.join(series_name_0) category_id = series_name.xpath(".//a[@href]").extract() category_id = ''.join(category_id) category_id = re.search('\d.?\d',category_id).group() catalogue_0.category_id = category_id # 次级产品系列ID catalogue_0.catalogue_name = series_name_0 # 次级产品系列的名称 catalogue_0.catalogue_level = 2 # 次级产品系列的等级 catalogue_0.father_id = _id # 父节点的ID existed_id = Catalogue.select().where((Catalogue.catalogue_name==series_name_0) & (Catalogue.category_id == category_id)) if existed_id: #catalogue_0.save() pass else: catalogue_0.save(force_insert=True)
#根据catalogue存储的数据来获取category_id拼接商品最外层的url链接 def get_catalogue_url(): url_list = [] #catalogue = Catalogue() id_data = Catalogue.select().where(Catalogue.catalogue_level==2) for item in id_data: url = category_url + str(item.category_id) + "&okey=salenum&order=desc" url_list.append(url) #id_data = Catalogue.get(Catalogue.series_level_0==1).category_id return url_list
def parse_product(url): #获取商品的详情以及销售数量 res_text = requests.get(url).text sel = Selector(text=res_text) res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']") flag_num = 0 for item in res_li: product_id = item.xpath('./div[contains(@class,"pro_pic_box")]/a[@href]').extract() # 产品ID product_id = re.search('id=.*\d\"',''.join(product_id)) product_id = product_id.group().replace("id=","") product_id = product_id.replace("\"","") product_id = int(product_id)
name = item.xpath("./div[@class='row row-2 title']/a/text()").extract() # 产品名字 name = ''.join(name) price = item.xpath('./div[@id="goods_detail_b"]/div[@class="row row-1"]/div[@class="g_price fm2"]/strong/text()').extract() # 显示价格 price = ''.join(price) try: price = float(price) except: print("价格会员可见|价格请咨询商家") continue sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量 sales_num= ''.join(sales_num) sales_num = sales_num.split('销量:')[1] sales_num = int(sales_num) flag_num = sales_num if sales_num < 1: continue store_id = item.xpath("./div[@class='row row-3 c']/a[@href]").extract() store_id = re.search('id=.*\d\"',''.join(store_id)) store_id = store_id.group().replace("id=","") store_id = store_id.replace("\"","") store_id = int(store_id)
merchant = item.xpath("./div[@id='goods_detail_b']/div[2]/p[2]/text()").extract() # 商家 merchant = ''.join(merchant)
main_Products = item.xpath("./div[@id='goods_detail_b']/div[2]/p[3]/text()").extract() # 主营 main_Products = ''.join(main_Products)
merchant_Place = item.xpath("./div[@id='goods_detail_b']/div[2]/p[4]/text()").extract() # 地址 merchant_Place = ''.join(merchant_Place) product = Product() product.product_id = product_id product.name = name product.price = price product.sales_num = sales_num product.store_id = store_id product.merchant = merchant product.main_Products = main_Products product.merchant_Place = merchant_Place existed_name = Product.select().where(Product.product_id==product_id) if existed_name: pass #product.save() else: product.save(force_insert=True) next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract() if len(next_page) > 2 and flag_num > 0: url_next = re.search('\".*\d\"',next_page[-1]) url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理 url_next = url_next.replace("\"","") url_next = parse.urljoin(domain,url_next) #print(url_next) parse_product(url_next) else: pass
#获取商品链接,上一级url为商品详情页 def parse_data_last(url): #store_id_list = [] flag_num = 0 #获取商品的详情标签 while True: try: res_text = requests.get(url).text except: time.sleep(3) print('间隔休眠时间,再次处理') else: break sel = Selector(text=res_text) res_li = sel.xpath("//div[@class='pro_list_div g-clearfix c']/ul//li[@class='goods_offset']") for item in res_li: sales_num = item.xpath("./div[@id='goods_detail_b']/div[2]/p[1]/text()").extract() # 销售数量 sales_num= ''.join(sales_num) sales_num = sales_num.split('销量:')[1] sales_num = int(sales_num) flag_num = int(sales_num)
data = item.xpath("./div[@class='pro_pic_box']/a").extract() data = re.search('\".*\d\"',data[0]) data = data.group().replace("&","&") data = data.replace("\"","") data_url = parse.urljoin(domain,data) # 链接为销量排序之后的单个商品链接,传出链接 print("开始获取商品:{}".format(data_url))
if sales_num > 0: r.lpush('91jiafan:catalogue_url',data_url) # 此处存储商品的url,判断条件为销售数量大于0
#此处代码用来切到下一页链接数据,商品的详情排布页 next_page = sel.xpath("//*[@class='pagination2']/a[@href]").extract() if len(next_page) > 2 and flag_num > 0: url_next = re.search('\".*\d\"',next_page[-1]) url_next = url_next.group().replace("&","&") # 此处&由于被转义成&导致需要重新进行处理 url_next = url_next.replace("\"","") url_next = parse.urljoin(domain,url_next) parse_data_last(url_next)
product_attributes = Product_attributes() product_attributes.product_id = product_id product_attributes.price_base = price_base product_attributes.attributes = str_attributes product_attributes.buyer_num = buyer_num product_attributes.sale_num = sale_num product_attributes.buyer_rate = buyer_rate existed_id = Product_attributes.select().where(Product_attributes.product_id==product_id) if existed_id: pass #product_attributes.save() else: print("开始保存商品详细信息:{}".format(url)) product_attributes.save(force_insert=True) else : price = "价格请咨询商家"
#获取商户详细数据,处理逻辑为根据单个商品目录来获取对应的商户id def parse_store_id(url): #print(url) # 打印当前商品页的url用来定位 res_text = requests.get(url).text sel = Selector(text=res_text) store_id = 0 #筛选规则,当is_price之后的value属性值为0的时候,说明不需要咨询商家,同时需要注意的是,商品会有打折批次数量的差异导致价格差异, #这一点需要根据具体的显示页面来处理,现在忽略,由于可能存在打折段的数据差异,所以暂时不考虑 Is_price = sel.xpath("//input[contains(@id,'is_price')]").extract()#取到的数据用来判断价格是否需要咨询商家 if len(Is_price) < 1: print("页面数据为空") else: is_value = re.search('\d',Is_price[0]) if is_value.group() == '0': # 0表示商品价格不需要咨询商户 store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract() store_id = ''.join(store_id) store_id = re.search('storeid=\d*\"',store_id) store_id = store_id.group() store_id = store_id.split('storeid=')[1] store_id = store_id.replace("\"","") store_id = int(store_id) # 商户的id else : pass return store_id
#根据store_id拼接的url用来抓取商户的数据 def parse_store_data(url): res_text = requests.get(url).text sel = Selector(text=res_text) if len(res_text) > 10: store_name = sel.xpath('//span[contains(@class,"container_title_span")]/a[@href]/text()').extract() store_name = ''.join(store_name) # 商户的名字 store_id = sel.xpath('//span[@class="container_title_span"]/a[@href]').extract() store_id = ''.join(store_id) store_id = re.search('storeid=\d*\"',store_id) store_id = store_id.group() store_id = store_id.split('storeid=')[1] store_id = store_id.replace("\"","") store_id = int(store_id) # 商户的id
store_level = '' store_place = '' store_describe = '' store_supply = '' store_service = ''
store_data = sel.xpath('//ul[contains(@class,"gy_info_list")]/li/text()').extract() if len(store_data) > 3: store_level = store_data[2] # 商户等级 store_level = store_level.replace(" ","") store_level = store_level.replace("\n","") store_place = store_data[3] # 商户地址 store_place = store_place.replace(" ","")
store_aptitude = stor_url_aptitude + str(store_id) # 商户的资质
temp_datas = sel.xpath('//li[contains(@class,"evaluate")]//div[@style]//text()').extract() if len(temp_datas) == 6: store_describe = temp_datas[0] + ':' + temp_datas[1] # 商户描述 store_supply = temp_datas[2] + ':' + temp_datas[3] # 商户供货 store_service = temp_datas[4] + ':' + temp_datas[5] # 商户服务 store = Store() store.store_id = store_id store.store_name = store_name store.store_level = store_level store.store_place = store_place store.store_aptitude = store_aptitude store.store_describe = store_describe store.store_supply = store_supply store.store_service = store_service
existed_id = Store.select().where(Store.store_id==store_id) if existed_id: pass #store.save() else: print("开始获取商户信息:{}".format(store_id)) store.save(force_insert=True)
class ParseproductThread(Thread): def run(self): while(1): try: data = r.lpop('91jiafan:catalogue_url') print("开始处理商品:{}".format(data)) parse_product_data(data) store_id = parse_store_id(data) store_id_url = store_url_domain + str(store_id) r.lpush('91jiafan:store_id_url',store_id_url) except: time.sleep(120) print("data is null")
class Parse_storedata_Thread(Thread): def run(self): while(1): try: data = r.lpop('91jiafan:store_id_url') print("开始处理商户:{}".format(data)) parse_store_data(data) except: time.sleep(120) print("data is null")
class parse_91_productdata_Thread(Thread): def run(self): #提取商品列表页的数据 url_list = get_catalogue_url() for url in url_list: parse_product(url)
#end_time = datetime.now() #print("一共使用时间:",end_time - start_time)
标签:product,url,text,数据库,redis,catalogue,odps,id,store 来源: https://www.cnblogs.com/dog-and-cat/p/13355356.html