数据库
首页 > 数据库> > python – 什么是使用django芹菜和redis异步刮取并存储我的结果的正确方法并存储我的?

python – 什么是使用django芹菜和redis异步刮取并存储我的结果的正确方法并存储我的?

作者:互联网

当我尝试使用我在django应用程序中创建的函数进行刮擦时,我一直在试图理解我的问题.该功能转到网站收集数据并将其存储在我的数据库中.起初我尝试使用rq和redis一段时间,但我不断收到错误消息.所以有人认为我应该尝试使用芹菜,我做到了.但我现在看到rq和芹菜都是问题所在.因为我收到的错误信息与之前相同.我累了导入它,但仍然得到了错误信息,然后我想好了也许如果我在tasks.py文件中有实际的功能,它会产生影响,但事实并非如此.继承了我试图在tasks.py中使用的功能

import requests
from bs4 import BeautifulSoup
from src.blog.models import Post
import random
import re
from django.contrib.auth.models import User
import os

@app.tasks
def p_panties():
    def swappo():
        user_one = ' "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:24.0) Gecko/20100101 Firefox/24.0" '
        user_two = ' "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5)" '
        user_thr = ' "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko" '
        user_for = ' "Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:10.0) Gecko/20100101 Firefox/10.0" '

        agent_list = [user_one, user_two, user_thr, user_for]
        a = random.choice(agent_list)
        return a

    headers = {
        "user-agent": swappo(),
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "accept-charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.3",
        "accept-encoding": "gzip,deflate,sdch",
        "accept-language": "en-US,en;q=0.8",
    }

    pan_url = 'http://www.example.org'
    shtml = requests.get(pan_url, headers=headers)
    soup = BeautifulSoup(shtml.text, 'html5lib')
    video_row = soup.find_all('div', {'class': 'post-start'})
    name = 'pan videos'

    if os.getenv('_system_name') == 'OSX':
        author = User.objects.get(id=2)
    else:
        author = User.objects.get(id=3)

    def youtube_link(url):
        youtube_page = requests.get(url, headers=headers)
        soupdata = BeautifulSoup(youtube_page.text, 'html5lib')
        video_row = soupdata.find_all('p')[0]
        entries = [{'text': div,
                    } for div in video_row]
        tubby = str(entries[0]['text'])
        urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', tubby)
        cleaned_url = urls[0].replace('?&autoplay=1', '')
        return cleaned_url

    def yt_id(code):
        the_id = code
        youtube_id = the_id.replace('https://www.youtube.com/embed/', '')
        return youtube_id

    def strip_hd(hd, move):
        str = hd
        new_hd = str.replace(move, '')
        return new_hd

    entries = [{'href': div.a.get('href'),
                'text': strip_hd(strip_hd(div.h2.text, '– Official video HD'), '– Oficial video HD').lstrip(),
                'embed': youtube_link(div.a.get('href')), #embed
                'comments': strip_hd(strip_hd(div.h2.text, '– Official video HD'), '– Oficial video HD').lstrip(),
                'src': 'https://i.ytimg.com/vi/' + yt_id(youtube_link(div.a.get('href'))) + '/maxresdefault.jpg', #image
                'name': name,
                'url': div.a.get('href'),
                'author': author,
                'video': True

                } for div in video_row][:13]

    for entry in entries:
        post = Post()
        post.title = entry['text']
        title = post.title
        if not Post.objects.filter(title=title):
            post.title = entry['text']
            post.name = entry['name']
            post.url = entry['url']
            post.body = entry['comments']
            post.image_url = entry['src']
            post.video_path = entry['embed']
            post.author = entry['author']
            post.video = entry['video']
            post.status = 'draft'
            post.save()
            post.tags.add("video", "Musica")
    return entries

如果我运行,在python shell中

from tasks import *

我明白了

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/ray/Desktop/myheroku/practice/tasks.py", line 5, in <module>
    from src.blog.models import Post
  File "/Users/ray/Desktop/myheroku/practice/src/blog/models.py", line 3, in <module>
    from taggit.managers import TaggableManager
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/taggit/managers.py", line 7, in <module>
    from django.contrib.contenttypes.models import ContentType
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/contrib/contenttypes/models.py", line 159, in <module>
    class ContentType(models.Model):
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/contrib/contenttypes/models.py", line 160, in ContentType
    app_label = models.CharField(max_length=100)
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/db/models/fields/__init__.py", line 1072, in __init__
    super(CharField, self).__init__(*args, **kwargs)
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/db/models/fields/__init__.py", line 166, in __init__
    self.db_tablespace = db_tablespace or settings.DEFAULT_INDEX_TABLESPACE
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/conf/__init__.py", line 55, in __getattr__
    self._setup(name)
  File "/Users/ray/Desktop/myheroku/practice/lib/python3.5/site-packages/django/conf/__init__.py", line 41, in _setup
    % (desc, ENVIRONMENT_VARIABLE))
django.core.exceptions.ImproperlyConfigured: Requested setting DEFAULT_INDEX_TABLESPACE, but settings are not configured. You must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.

这是我用rq和redis完全相同的追溯.我发现如果我像这样修改导入

import requests
from bs4 import BeautifulSoup
# from src.blog.models import Post
import random
import re
# from django.contrib.auth.models import User
import os

并修改我的功能

@app.task
def p_panties():
    def swappo():
        user_one = ' "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:24.0) Gecko/20100101 Firefox/24.0" '
        user_two = ' "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_5)" '
        user_thr = ' "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko" '
        user_for = ' "Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:10.0) Gecko/20100101 Firefox/10.0" '

        agent_list = [user_one, user_two, user_thr, user_for]
        a = random.choice(agent_list)
        return a

    headers = {
        "user-agent": swappo(),
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "accept-charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.3",
        "accept-encoding": "gzip,deflate,sdch",
        "accept-language": "en-US,en;q=0.8",
    }

    pan_url = 'http://www.example.org'
    shtml = requests.get(pan_url, headers=headers)
    soup = BeautifulSoup(shtml.text, 'html5lib')
    video_row = soup.find_all('div', {'class': 'post-start'})
    name = 'pan videos'

    # if os.getenv('_system_name') == 'OSX':
    #     author = User.objects.get(id=2)
    # else:
    #     author = User.objects.get(id=3)

    def youtube_link(url):
        youtube_page = requests.get(url, headers=headers)
        soupdata = BeautifulSoup(youtube_page.text, 'html5lib')
        video_row = soupdata.find_all('p')[0]
        entries = [{'text': div,
                    } for div in video_row]
        tubby = str(entries[0]['text'])
        urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', tubby)
        cleaned_url = urls[0].replace('?&amp;autoplay=1', '')
        return cleaned_url

    def yt_id(code):
        the_id = code
        youtube_id = the_id.replace('https://www.youtube.com/embed/', '')
        return youtube_id

    def strip_hd(hd, move):
        str = hd
        new_hd = str.replace(move, '')
        return new_hd

    entries = [{'href': div.a.get('href'),
                'text': strip_hd(strip_hd(div.h2.text, '– Official video HD'), '– Oficial video HD').lstrip(),
                'embed': youtube_link(div.a.get('href')), #embed
                'comments': strip_hd(strip_hd(div.h2.text, '– Official video HD'), '– Oficial video HD').lstrip(),
                'src': 'https://i.ytimg.com/vi/' + yt_id(youtube_link(div.a.get('href'))) + '/maxresdefault.jpg', #image
                'name': name,
                'url': div.a.get('href'),
                # 'author': author,
                'video': True

                } for div in video_row][:13]
    #
    # for entry in entries:
    #     post = Post()
    #     post.title = entry['text']
    #     title = post.title
    #     if not Post.objects.filter(title=title):
    #         post.title = entry['text']
    #         post.name = entry['name']
    #         post.url = entry['url']
    #         post.body = entry['comments']
    #         post.image_url = entry['src']
    #         post.video_path = entry['embed']
    #         post.author = entry['author']
    #         post.video = entry['video']
    #         post.status = 'draft'
    #         post.save()
    #         post.tags.add("video", "Musica")
    return entries

它有效,因为这是我的输出

[2016-08-13 08:31:17,222: INFO/MainProcess] Received task: tasks.p_panties[e196c6bf-2b87-4bb2-ae11-452e3c41434f]
[2016-08-13 08:31:17,238: INFO/Worker-4] Starting new HTTP connection (1): www.example.org
[2016-08-13 08:31:17,582: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:18,314: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:18,870: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:19,476: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:20,089: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:20,711: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:21,218: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:21,727: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:22,372: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:22,785: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:23,375: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:23,983: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:24,396: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:25,003: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:25,621: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:26,029: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:26,446: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:27,261: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:27,671: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:28,082: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:28,694: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:29,311: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:29,922: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:30,535: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:31,154: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:31,765: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:32,387: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:32,992: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:33,611: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:34,030: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:34,635: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:35,041: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:35,659: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:36,278: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:36,886: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:37,496: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:37,913: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:38,564: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:39,143: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:39,754: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:40,409: INFO/Worker-4] Starting new HTTP connection (1): example.org
[2016-08-13 08:31:40,992: INFO/MainProcess] Task tasks.p_panties[e196c6bf-2b87-4bb2-ae11-452e3c41434f] succeeded in 23.767645187006565s: [{'src': 'https://i.ytimg.com/vi/3bU-AtShW7Y/maxresdefault.jpg', 'name': 'pan videos', 'url':...

似乎需要某种类型的授权才能与我的Post模型进行交互.我只是不知道如何.我一直在搜索关于如何刮取数据并将数据保存到数据库中的示例.奇怪的是我没遇到过.我可以阅读的任何建议提示文档将是一个很大的帮助.

编辑

我的文件结构

environ\
  |-src\
     |-blog\
        |-migrations\
        |-static\
        |-templates\
        |-templatetags\
        |-__init__.py
        |-admin.py
        |-forms.py
        |-models
        |-tasks
        |-urls
        |-views

解决方法:

你需要设置Django

您似乎试图在Python shell中运行您的任务,因为您的代码在注释掉Django模型部分时会更有效.

所以问题是,当运行纯python shell时,需要设置Django才能正常运行.当您通过manage.py shell运行它时,manage.py会照顾或为您设置它,但是通过python脚本执行它需要手动设置.这是丢失DJANGO_SETTINGS_MODULE错误的原因.

您似乎也使用了已定义的模型,以便能够将它们导入到python脚本中,您需要将项目的根文件夹的路径添加到当前的python路径.

最后,你需要告诉django你的设置文件在哪里(在设置你的django之前),在你的manage.py文件中,你应该有这样的东西:

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")

将它设为常量,将其命名为DEFAULT_SETTINGS_MODULE,因此您现在拥有:

os.environ.setdefault("DJANGO_SETTINGS_MODULE", DEFAULT_SETTINGS_MODULE)

现在您需要将常量导入到脚本中并告诉django(通过设置env var)它应该在哪里查找设置文件.

总而言之:

import sys, os
sys.path.insert(0, "/path/to/parent/of/src") # /home/projects/my-crawler

from manage import DEFAULT_SETTINGS_MODULE
os.environ.setdefault("DJANGO_SETTINGS_MODULE", DEFAULT_SETTINGS_MODULE)

import django
django.setup() 
... The rest of your script ...

这样你就可以正常安装了.但是如果你想运行芹菜任务,你应该使用.delay()或.apply_async(),以确保代码在后台运行.

我自己的建议是使用python manage.py shell运行python shell,这种情况下django会为你处理一切.您只需导入任务并运行它.

还有关于存储刮擦任务的结果,你可以在数据库中,或者在redis中,或者在任何地方(文件,另一个Web服务器……等),你也可以调用另一个celery任务来处理结果并将条目传递给它).

只需将其添加到任务代码的末尾即可.

Redis的

from redis import StrictRedis

redis = StrictRedis(host='localhost', port=6379, db=0)

redis.set("scraping:tasks:results:TASK-ID-HERE", json.dumps(entries))

这是保存结果的最简单方法,但您也可以使用Redis lists/maps.

仅供参考,您可以使用列表进行操作

with redis.pipeline() as pipe:
    for item in entries:
        pipe.rpush("scraping:tasks:results", json.dumps(item))
    pipe.execute()

—-编辑

正如我所提到的,您可以定义另一个芹菜任务来处理当前抓取的结果.所以基本上你有以下内容:

@celery_app.task
def handle_scraping_results(entries):
    you do whatever you want with the entries array now

并在你的p_panties任务结束时调用它,如下所示:

handle_scraping_results.delay(entries)

RabbitMQ在这里做的是将p_panties任务中的消息传递给handle_scraping_results任务.您需要注意这些不是简单的功能,共享相同的内存地址空间,它们可以位于不同的服务器上的不同进程中!这实际上是芹菜的用途.您无法调用处于不同进程中的函数. RabbitMQ在这里出现并从进程A(具有任务p_panties)接收消息,并将其传递给进程B(具有任务handle_result)(消息传递是RPC的完美方法).

你不能在rabbitmq中保存任何东西,它不像redis.我引诱你在celery阅读更多内容,因为你似乎错误地选择了它.使用芹菜不会解决你的问题,它实际上增加了它(因为它可能在开始时很难理解).如果您不需要异步处理,只需完全摆脱芹菜.让你的代码成为一个单独的函数,你可以很容易地从python shell或manage.py shell中调用它,就像我上面描述的那样.

———编辑II

您希望每隔几个小时在数据库中保留一次.因此,只要您的任务在某处完成或者结果丢失,您就必须坚持下去.

你有两个选择

>在任务完成时保留DB(这不会是每隔几个小时)
>要在任务完成时持续使用Redis,然后每隔几个小时左右,您将拥有一个periodic task,将它们保存在django数据库中.

第一种方法很简单,只需取消注释您在自己的代码中注释的代码即可.
第二种方式需要更多的工作.

考虑到你的结果是在我告诉你的redis中持久存在的,你可以像下面这样定期执行任务来处理持久存储到DB中.

redis_keys = redis.get("scraping:tasks:results:*")

for key in redis_keys:
    value_of_redis_key = redis.get(key)
    entries = json.loads(entries)
    for entry in entries:
        post = Post()
        post.title = entry['text']
        title = post.title
        if not Post.objects.filter(title=title):
            post.title = entry['text']
            post.name = entry['name']
            post.url = entry['url']
            post.body = entry['comments']
            post.image_url = entry['src']
            post.video_path = entry['embed']
            post.author = entry['author']
            post.video = entry['video']
            post.status = 'draft'
            post.save()
            post.tags.add("video", "Musica")

标签:python,django,rabbitmq,queue,django-celery
来源: https://codeday.me/bug/20190527/1166041.html