Pika is a RabbitMQ (AMQP 0-9-1) client library for Python.
Introduction
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol includingRabbitMQ's extensions.
Supports Python 3.7+ (1.1.0 was the last version to support 2.7)Since threads aren't appropriate to every situation, it doesn't requirethreads. Pika core takes care not to forbid them, either. The same goes forgreenlets, callbacks, continuations, and generators. An instance of Pika'sbuilt-in connection adapters isn't thread-safe, however.People may be using direct sockets, plain old select(), or any of thewide variety of ways of getting network events to and from a Pythonapplication. Pika tries to stay compatible with all of these, and to makeadapting it to a new environment as simple as possible.DocumentationPika's documentation can be found at https://pika.readthedocs.io.
ExampleHere is the most simple example of use, sending a message with thepika.BlockingConnection adapter:
import pikaconnection = pika.BlockingConnection()channel = connection.channel()channel.basic_publish(exchange='test', routing_key='test', body=b'Test message.')connection.close()And an example of writing a blocking consumer:
import pikaconnection = pika.BlockingConnection()channel = connection.channel()for method_frame, properties, body in channel.consume('test'):# Display the message parts and acknowledge the messageprint(method_frame, properties, body)channel.basic_ack(method_frame.delivery_tag)# Escape out of the loop after 10 messagesif method_frame.delivery_tag == 10:break# Cancel the consumer and return any pending messagesrequeued_messages = channel.cancel()print('Requeued %i messages' % requeued_messages)connection.close()Pika provides the following adapterspika.adapters.asyncio_connection.AsyncioConnection - asynchronous adapterfor Python 3 AsyncIO'sI/O loop.pika.BlockingConnection - synchronous adapter on top of library forsimple usage.pika.SelectConnection - asynchronous adapter without third-partydependencies.pika.adapters.gevent_connection.GeventConnection - asynchronous adapterfor use with Gevent's I/O loop.pika.adapters.tornado_connection.TornadoConnection - asynchronous adapterfor use with Tornado's I/O loop.pika.adapters.twisted_connection.TwistedProtocolConnection - asynchronousadapter for use with Twisted's I/O loop.Multiple connection parametersYou can also pass multiple pika.ConnectionParameters instances forfault-tolerance as in the code snippet below (host names are just examples, ofcourse). To enable retries, set connection_attempts and retry_delay asneeded in the last pika.ConnectionParameters element of the sequence.Retries occur after connection attempts using all of the given connectionparameters fail.
import pikaparameters = (pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com', connection_attempts=5, retry_delay=1))connection = pika.BlockingConnection(parameters)With non-blocking adapters, such as pika.SelectConnection andpika.adapters.asyncio_connection.AsyncioConnection, you can request aconnection using multiple connection parameter instances via the connectionadapter's create_connection() class method.
Requesting message acknowledgements from another threadThe single-threaded usage constraint of an individual Pika connection adapterinstance may result in a dropped AMQP/stream connection due to AMQP heartbeattimeout in consumers that take a long time to process an incoming message. Acommon solution is to delegate processing of the incoming messages to anotherthread, while the connection adapter's thread continues to service its I/Oloop's message pump, permitting AMQP heartbeats and other I/O to be serviced ina timely fashion.
Messages processed in another thread may not be acknowledged directly from thatthread, since all accesses to the connection adapter instance must be from asingle thread, which is the thread running the adapter's I/O loop. This isaccomplished by requesting a callback to be executed in the adapter'sI/O loop thread. For example, the callback function's implementation might looklike this:
def ack_message(channel, delivery_tag):"""Note that `channel` must be the same Pika channel instance via whichthe message being acknowledged was retrieved (AMQP protocol constraint)."""if channel.is_open:channel.basic_ack(delivery_tag)else:# Channel is already closed, so we can't acknowledge this message;# log and/or do something that makes sense for your app in this case.passThe code running in the other thread may request the ack_message() functionto be executed in the connection adapter's I/O loop thread using anadapter-specific mechanism:
pika.BlockingConnection abstracts its I/O loop from the application andthus exposes pika.BlockingConnection.add_callback_threadsafe(). Refer tothis method's docstring for additional information. For example:
connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))When using a non-blocking connection adapter, such aspika.adapters.asyncio_connection.AsyncioConnection orpika.SelectConnection, you use the underlying asynchronous framework'snative API for requesting an I/O loop-bound callback from another thread. Forexample, pika.SelectConnection's I/O loop providesadd_callback_threadsafe(),pika.adapters.tornado_connection.TornadoConnection's I/O loop hasadd_callback(), whilepika.adapters.asyncio_connection.AsyncioConnection's I/O loop exposescall_soon_threadsafe().
This threadsafe callback request mechanism may also be used to delegatepublishing of messages, etc., from a background thread to the connectionadapter's thread.
Connection recoverySome RabbitMQ clients (Bunny, Java, .NET, Objective-C, Swift) provide a way toautomatically recover a connection, its channels and topology (e.g. queues,bindings and consumers) after a network failure. Others require connectionrecovery to be performed by the application code and strive to make it astraightforward process. Pika falls into the second category.
Pika supports multiple connection adapters. They take different approaches toconnection recovery.
For pika.BlockingConnection adapter exception handling can be used to checkfor connection errors. Here is a very basic example:
import pikawhile True:try:connection = pika.BlockingConnection()channel = connection.channel()channel.basic_consume('test', on_message_callback)channel.start_consuming()# Don't recover if connection was closed by brokerexcept pika.exceptions.ConnectionClosedByBroker:break# Don't recover on channel errorsexcept pika.exceptions.AMQPChannelError:break# Recover on all other connection errorsexcept pika.exceptions.AMQPConnectionError:continueThis example can be found in examples/consume_recover.py.
Generic operation retry libraries such asretry can be used. Decorators make itpossible to configure some additional recovery behaviours, like delays betweenretries and limiting the number of retries:
from retry import retry@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))def consume():connection = pika.BlockingConnection()channel = connection.channel()channel.basic_consume('test', on_message_callback)try:channel.start_consuming()# Don't recover connections closed by serverexcept pika.exceptions.ConnectionClosedByBroker:passconsume()This example can be found in examples/consume_recover_retry.py.
For asynchronous adapters, use on_close_callback to react to connectionfailure events. This callback can be used to clean up and recover theconnection.
An example of recovery using on_close_callback can be found inexamples/asynchronous_consumer_example.py.
ContributingTo contribute to Pika, please make sure that any new features or changes toexisting functionality include test coverage.
Pull requests that add or change code without adequate test coverage will berejected.
Additionally, please format your code usingYapf with google style prior toissuing your pull request. Note: only format those lines that you have changedin your pull request. If you format an entire file and change code outside ofthe scope of your PR, it will likely be rejected.
Extending to support additional I/O frameworksNew non-blocking adapters may be implemented in either of the following ways:
By subclassing pika.BaseConnection, implementing its abstract method andpassing its constructor an implementation ofpika.adapters.utils.nbio_interface.AbstractIOServices.pika.BaseConnection implements pika.connection.Connection's abstractmethods, including internally-initiated connection logic. For examples, referto the implementations ofpika.adapters.asyncio_connection.AsyncioConnection,pika.adapters.gevent_connection.GeventConnection andpika.adapters.tornado_connection.TornadoConnection.By subclassing pika.connection.Connection and implementing its abstractmethods. This approach facilitates implementation of customconnection-establishment and transport mechanisms. For an example, refer tothe implementation ofpika.adapters.twisted_connection.TwistedProtocolConnection.