用户登录
用户注册

分享至

调用 celery 任务挂起延迟和 apply_async

  • 作者: 海面-并不平
  • 来源: 51数据库
  • 2022-11-30

问题描述

我创建了一个具有以下目录结构的 celery 应用程序(如 celery 网站中给出的):

I have created a celery app with following directory structure (as given in celery site):

proj
|-- celery.py
|-- celery.pyc
|-- __init__.py
|-- __init__.pyc
|-- tasks.py
`-- tasks.pyc

以下是 celery.py 的内容

Following are contents of celery.py

from __future__ import absolute_import

from celery import Celery

app = Celery('proj',
             broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
             #backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()

以下是tasks.py的内容

Following is the content of tasks.py

from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

现在我正在使用以下命令启动 celery worker:

Now I am starting celery worker with following command:

celery -A proj worker -l debug

我认为 worker 运行良好,因为它输出如下:

I think worker is running fine as it outputs following on:

[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers
the ability to execute any command.  It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.

If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.


  warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

 -------------- celery@ansumanb-u12 v3.1.12 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x1f46690
- ** ---------- .> transport:   amqp://rabbitmquser:**@localhost:5672/localvhost
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection

运行 worker 后,我打开终端并从 python 解释器执行以下操作:

After running the worker I am opening the terminal and from python interpreter and executing following:

>>> from proj.tasks import add
>>> add(2,2)
4
>>> add.delay(2,3)

这里延迟挂起(apply_async 的情况相同).当我通过 Ctrl+C 停止它时,我得到以下信息:

Here the delay hangs (same story for apply_async). When I am stopping it by Ctrl+C I am getting following:

^CTraceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
    return self.apply_async(args, kwargs)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
    **dict(self._get_exec_options(), **options)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
    reply_to=reply_to or self.oid, **options
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
    **kwargs
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
    routing_key, mandatory, immediate, exchange, declare)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
    return fun(*args, **kwargs)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
    channel = self.channel
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
    channel = self._channel = channel()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
    value = self.__value__ = self.__contract__()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
    self.connection
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
    self._connection = self._establish_connection()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
    conn = self.transport.establish_connection()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
    conn = self.Connection(**opts)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
    (10, 10),  # start
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods)
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
    self.method_reader.read_method()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
    frame_type, channel, payload = read_frame()
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
    frame_type, channel, size = unpack('>BHI', read(7, True))
  File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
    s = recv(n - len(rbuf))
KeyboardInterrupt

任何建议或评论将不胜感激.我浏览了他们谈论/var 目录大小的其他链接,但我认为我有足够的空间.

Any suggestion or comment will be much appreciated. I have gone through other links where they talk about /var directory size but I think I have enough space.

df -h 的结果

Filesystem      Size  Used Avail Use% Mounted on
/dev/sda3       283G   99G  170G  37% /
udev            1.9G  4.0K  1.9G   1% /dev
tmpfs           388M  1.1M  387M   1% /run
none            5.0M     0  5.0M   0% /run/lock
none            1.9G   28M  1.9G   2% /run/shm

以下是rabbitmqctl状态的结果

following is the result of rabbitmqctl status

Status of node 'rabbit@ansumanb-u12' ...
[{pid,12014},
 {running_applications,[{rabbit,"RabbitMQ","3.3.2"},
                        {os_mon,"CPO  CXC 138 46","2.2.7"},
                        {xmerl,"XML parser","1.2.10"},
                        {mnesia,"MNESIA  CXC 138 12","4.5"},
                        {sasl,"SASL  CXC 138 11","2.1.10"},
                        {stdlib,"ERTS  CXC 138 10","1.17.5"},
                        {kernel,"ERTS  CXC 138 10","2.14.5"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]
"},
 {memory,[{total,27919080},
          {connection_procs,2704},
          {queue_procs,5408},
          {plugins,0},
          {other_proc,9099992},
          {mnesia,63776},
          {mgmt_db,0},
          {msg_index,34080},
          {other_ets,784160},
          {binary,12144},
          {code,14685283},
          {atom,1367393},
          {other_system,1864140}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,1625165004},
 {disk_free_limit,50000000},
 {disk_free,181684699136},
 {file_descriptors,[{total_limit,2},
                    {total_used,0},
                    {sockets_limit,0},
                    {sockets_used,0}]},
 {processes,[{limit,1048576},{used,127}]},
 {run_queue,0},
 {uptime,20072}]
...done.

我检查了rabbitmq 日志并没有得到任何东西.Celery 版本是 3.1.12.

I've checked the rabbitmq logs and didn't get anything there. Celery version is 3.1.12.

我已经使用以下命令创建了 rabbitmq 虚拟主机和用户

I have created rabbitmq virtual host and user with following commands

$ sudo rabbitmqctl add_user rabbitmquser <mypassword>
$ sudo rabbitmqctl add_vhost localvhost
$ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"

谢谢

推荐答案

我犯了一个很大的错误.我的问题是我试图改变一些我不知道的东西.我正在添加它以供其他人参考.

I did a great mistake. My problem was I tried to change something about which I didn't know. I am adding it for others reference.

在安装 rabbitmq 时,我在 /etc/default/rabbitmq-server 将 ulimit 的默认值从 1024 更改为 100.

While installing rabbitmq I changed the default value of ulimit from 1024 to 100 at /etc/default/rabbitmq-server.

我将值改回 1024,现在问题已解决.

I changed the value back to 1024 and the issue is fixed now.

软件
前端设计
程序设计
Java相关