Skip to content

Commit

Permalink
feat: migrate to new pamqp
Browse files Browse the repository at this point in the history
  • Loading branch information
galuszkak committed Oct 18, 2020
1 parent d97fd2b commit c869a98
Show file tree
Hide file tree
Showing 31 changed files with 965 additions and 278 deletions.
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
dist: xenial
language: python
python:
- 2.7
- 3.4
- 3.5
- 3.6
- 3.7
- 3.8
services:
- docker
install:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
rabbitmq:
image: rabbitmq:3.7
image: rabbitmq:3.8
ports:
- 5672
- 15672
2 changes: 1 addition & 1 deletion docs/api/exceptions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rabbitpy contains two types of exceptions, exceptions that are specific to rabbi
self._write_frame(frame_value)
File "rabbitpy/base.py", line 311, in _write_frame
raise exception
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.specification.Channel.Close object at 0x10e86bd50>
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.commands.Channel.Close object at 0x10e86bd50>
In this example, the channel that was created on the second line was closed and RabbitMQ is raising the :class:`AMQPPreconditionFailed <rabbitpy.exceptions.AMQPPreconditionFailed>` exception via RPC sent to your application using the AMQP Channel.Close method.

Expand Down
43 changes: 21 additions & 22 deletions rabbitpy/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
AMQP Adapter
"""
from pamqp import specification as spec

from pamqp import commands
from rabbitpy import base
from rabbitpy import message
from rabbitpy import exceptions
Expand Down Expand Up @@ -35,7 +34,7 @@ def basic_ack(self, delivery_tag=0, multiple=False):
:param bool multiple: Acknowledge multiple messages
"""
self._write_frame(spec.Basic.Ack(delivery_tag, multiple))
self._write_frame(commands.Basic.Ack(delivery_tag, multiple))

def basic_consume(self, queue='', consumer_tag='', no_local=False,
no_ack=False, exclusive=False, nowait=False,
Expand Down Expand Up @@ -70,7 +69,7 @@ def basic_consume(self, queue='', consumer_tag='', no_local=False,
consumer_tag = self.consumer_tag
# pylint: disable=protected-access
self.channel._consumers[consumer_tag] = (self, no_ack)
self._rpc(spec.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
self._rpc(commands.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
exclusive, nowait, arguments))
self._consuming = True
try:
Expand Down Expand Up @@ -119,7 +118,7 @@ def basic_get(self, queue='', no_ack=False):
:param bool no_ack: No acknowledgement needed
"""
self._rpc(spec.Basic.Get(0, queue, no_ack))
self._rpc(commands.Basic.Get(0, queue, no_ack))

def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
"""Reject one or more incoming messages.
Expand All @@ -137,7 +136,7 @@ def basic_nack(self, delivery_tag=0, multiple=False, requeue=True):
:param bool requeue: Requeue the message
"""
self._write_frame(spec.Basic.Nack(delivery_tag, multiple, requeue))
self._write_frame(commands.Basic.Nack(delivery_tag, multiple, requeue))

def basic_publish(self, exchange='', routing_key='', body='',
properties=None, mandatory=False, immediate=False):
Expand Down Expand Up @@ -178,7 +177,7 @@ def basic_qos(self, prefetch_size=0, prefetch_count=0, global_flag=False):
:param bool global_flag: Apply to entire connection
"""
self._rpc(spec.Basic.Qos(prefetch_size, prefetch_count, global_flag))
self._rpc(commands.Basic.Qos(prefetch_size, prefetch_count, global_flag))

def basic_reject(self, delivery_tag=0, requeue=True):
"""Reject an incoming message
Expand All @@ -192,7 +191,7 @@ def basic_reject(self, delivery_tag=0, requeue=True):
:param bool requeue: Requeue the message
"""
self._write_frame(spec.Basic.Reject(delivery_tag, requeue))
self._write_frame(commands.Basic.Reject(delivery_tag, requeue))

def basic_recover(self, requeue=False):
"""Redeliver unacknowledged messages
Expand All @@ -204,14 +203,14 @@ def basic_recover(self, requeue=False):
:param bool requeue: Requeue the message
"""
self._rpc(spec.Basic.Recover(requeue))
self._rpc(commands.Basic.Recover(requeue))

def confirm_select(self):
"""This method sets the channel to use publisher acknowledgements. The
client can only use this method on a non-transactional channel.
"""
self._rpc(spec.Confirm.Select())
self._rpc(commands.Confirm.Select())

def exchange_declare(self, exchange='', exchange_type='direct',
passive=False, durable=False, auto_delete=False,
Expand All @@ -232,7 +231,7 @@ def exchange_declare(self, exchange='', exchange_type='direct',
:param dict arguments: Arguments for declaration
"""
self._rpc(spec.Exchange.Declare(0, exchange, exchange_type, passive,
self._rpc(commands.Exchange.Declare(0, exchange, exchange_type, passive,
durable, auto_delete, internal, nowait,
arguments))

Expand All @@ -248,7 +247,7 @@ def exchange_delete(self, exchange='', if_unused=False,
:param bool nowait: Do not send a reply method
"""
self._rpc(spec.Exchange.Delete(0, exchange, if_unused, nowait))
self._rpc(commands.Exchange.Delete(0, exchange, if_unused, nowait))

def exchange_bind(self, destination='', source='',
routing_key='', nowait=False, arguments=None):
Expand All @@ -263,7 +262,7 @@ def exchange_bind(self, destination='', source='',
:param dict arguments: Optional arguments
"""
self._rpc(spec.Exchange.Bind(0, destination, source, routing_key,
self._rpc(commands.Exchange.Bind(0, destination, source, routing_key,
nowait, arguments))

def exchange_unbind(self, destination='', source='',
Expand All @@ -279,7 +278,7 @@ def exchange_unbind(self, destination='', source='',
:param dict arguments: Optional arguments
"""
self._rpc(spec.Exchange.Unbind(0, destination, source, routing_key,
self._rpc(commands.Exchange.Unbind(0, destination, source, routing_key,
nowait, arguments))

def queue_bind(self, queue='', exchange='', routing_key='',
Expand All @@ -298,7 +297,7 @@ def queue_bind(self, queue='', exchange='', routing_key='',
:param dict arguments: Arguments for binding
"""
self._rpc(spec.Queue.Bind(0, queue, exchange, routing_key, nowait,
self._rpc(commands.Queue.Bind(0, queue, exchange, routing_key, nowait,
arguments))

def queue_declare(self, queue='', passive=False, durable=False,
Expand All @@ -319,7 +318,7 @@ def queue_declare(self, queue='', passive=False, durable=False,
:param dict arguments: Arguments for declaration
"""
self._rpc(spec.Queue.Declare(0, queue, passive, durable, exclusive,
self._rpc(commands.Queue.Declare(0, queue, passive, durable, exclusive,
auto_delete, nowait, arguments))

def queue_delete(self, queue='', if_unused=False, if_empty=False,
Expand All @@ -336,7 +335,7 @@ def queue_delete(self, queue='', if_unused=False, if_empty=False,
:param bool nowait: Do not send a reply method
"""
self._rpc(spec.Queue.Delete(0, queue, if_unused, if_empty, nowait))
self._rpc(commands.Queue.Delete(0, queue, if_unused, if_empty, nowait))

def queue_purge(self, queue='', nowait=False):
"""Purge a queue
Expand All @@ -348,7 +347,7 @@ def queue_purge(self, queue='', nowait=False):
:param bool nowait: Do not send a reply method
"""
self._rpc(spec.Queue.Purge(0, queue, nowait))
self._rpc(commands.Queue.Purge(0, queue, nowait))

def queue_unbind(self, queue='', exchange='', routing_key='',
arguments=None):
Expand All @@ -362,7 +361,7 @@ def queue_unbind(self, queue='', exchange='', routing_key='',
:param dict arguments: Arguments of binding
"""
self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key,
self._rpc(commands.Queue.Unbind(0, queue, exchange, routing_key,
arguments))

def tx_select(self):
Expand All @@ -373,7 +372,7 @@ def tx_select(self):
or Rollback methods.
"""
self._rpc(spec.Tx.Select())
self._rpc(commands.Tx.Select())

def tx_commit(self):
"""Commit the current transaction
Expand All @@ -383,7 +382,7 @@ def tx_commit(self):
immediately after a commit.
"""
self._rpc(spec.Tx.Commit())
self._rpc(commands.Tx.Commit())

def tx_rollback(self):
"""Abandon the current transaction
Expand All @@ -395,4 +394,4 @@ def tx_rollback(self):
recover call should be issued.
"""
self._rpc(spec.Tx.Rollback())
self._rpc(commands.Tx.Rollback())
20 changes: 10 additions & 10 deletions rabbitpy/amqp_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import logging
import warnings

from pamqp import specification
from pamqp import commands

from rabbitpy import base
from rabbitpy import exceptions
Expand Down Expand Up @@ -172,12 +172,12 @@ def bind(self, source, routing_key=None, arguments=None):
"""
if hasattr(source, 'name'):
source = source.name
frame = specification.Queue.Bind(queue=self.name,
frame = commands.Queue.Bind(queue=self.name,
exchange=source,
routing_key=routing_key or '',
arguments=arguments)
response = self._rpc(frame)
return isinstance(response, specification.Queue.BindOk)
return isinstance(response, commands.Queue.BindOk)

def consume(self, no_ack=False, prefetch=None, priority=None,
consumer_tag=None):
Expand Down Expand Up @@ -286,7 +286,7 @@ def delete(self, if_unused=False, if_empty=False):
:param bool if_empty: Delete only if empty
"""
self._rpc(specification.Queue.Delete(queue=self.name,
self._rpc(commands.Queue.Delete(queue=self.name,
if_unused=if_unused,
if_empty=if_empty))

Expand All @@ -305,7 +305,7 @@ def get(self, acknowledge=True):
:rtype: :class:`~rabbitpy.Message` or None
"""
self._write_frame(specification.Basic.Get(queue=self.name,
self._write_frame(commands.Basic.Get(queue=self.name,
no_ack=not acknowledge))

return self.channel._get_message() # pylint: disable=protected-access
Expand All @@ -332,7 +332,7 @@ def ha_declare(self, nodes=None):

def purge(self):
"""Purge the queue of all of its messages."""
self._rpc(specification.Queue.Purge())
self._rpc(commands.Queue.Purge())

def stop_consuming(self):
"""Stop consuming messages. This is usually invoked if you want to
Expand Down Expand Up @@ -361,7 +361,7 @@ def unbind(self, source, routing_key=None):
if hasattr(source, 'name'):
source = source.name
routing_key = routing_key or self.name
self._rpc(specification.Queue.Unbind(queue=self.name, exchange=source,
self._rpc(commands.Queue.Unbind(queue=self.name, exchange=source,
routing_key=routing_key))

def _consume(self, no_ack=False, prefetch=None, priority=None):
Expand All @@ -381,12 +381,12 @@ def _consume(self, no_ack=False, prefetch=None, priority=None):
self.consuming = True

def _declare(self, passive=False):
"""Return a specification.Queue.Declare class pre-composed for the rpc
"""Return a commands.Queue.Declare class pre-composed for the rpc
method since this can be called multiple times.
:param bool passive: Passive declare to retrieve message count and
consumer count information
:rtype: pamqp.specification.Queue.Declare
:rtype: pamqp.commands.Queue.Declare
"""
arguments = dict(self.arguments)
Expand All @@ -406,7 +406,7 @@ def _declare(self, passive=False):
'exclusive=%s, auto_delete=%s, arguments=%r',
self.name, self.durable, passive, self.exclusive,
self.auto_delete, arguments)
return specification.Queue.Declare(queue=self.name,
return commands.Queue.Declare(queue=self.name,
durable=self.durable,
passive=passive,
exclusive=self.exclusive,
Expand Down
Loading

0 comments on commit c869a98

Please sign in to comment.