Skip to content

New features #134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -8,16 +8,14 @@ PYTHON=python
all: build

add-submodules:
-git submodule add -b v0.8.0 https://github.com/alanxz/rabbitmq-c.git
git submodule add -b v0.9.0 https://github.com/alanxz/rabbitmq-c.git

submodules:
git submodule init
git submodule update

rabbitmq-c: submodules
(cd $(RABBIT_DIR); test -f configure || autoreconf -i)
(cd $(RABBIT_DIR); test -f Makefile || automake --add-missing)

(cd $(RABBIT_DIR); cmake .; cmake --build .)

rabbitmq-clean:
-(cd $(RABBIT_DIR) && make clean)
@@ -50,8 +48,7 @@ distclean: pyclean rabbitmq-distclean removepyc
-rm -f erl_crash.dump

$(RABBIT_TARGET):
(test -f config.h || cd $(RABBIT_DIR); ./configure --disable-tools --disable-docs)
(cd $(RABBIT_DIR); make)
(cd $(RABBIT_DIR); cmake .; cmake --build .;)


dist: rabbitmq-c $(RABBIT_TARGET)
51 changes: 42 additions & 9 deletions Modules/_librabbitmq/connection.c
Original file line number Diff line number Diff line change
@@ -7,6 +7,9 @@

#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_ssl_socket.h>
#include <amqp_framing.h>

#include "connection.h"
#include "distmeta.h"
@@ -974,6 +977,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
"channel_max",
"frame_max",
"heartbeat",
"ssl",
"confirmed",
"client_properties",
NULL
};
@@ -985,12 +990,15 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
int channel_max = 0xffff;
int frame_max = 131072;
int heartbeat = 0;
int ssl = 0;
int confirmed = 0;
int port = 5672;
PyObject *client_properties = NULL;

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist,
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiiiO", kwlist,
&hostname, &userid, &password, &virtual_host, &port,
&channel_max, &frame_max, &heartbeat, &client_properties)) {
&channel_max, &frame_max, &heartbeat, &ssl, &confirmed,
&client_properties)) {
return -1;
}

@@ -1012,6 +1020,8 @@ PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self,
self->channel_max = channel_max;
self->frame_max = frame_max;
self->heartbeat = heartbeat;
self->ssl = ssl;
self->confirmed = confirmed;
self->weakreflist = NULL;
self->callbacks = PyDict_New();
if (self->callbacks == NULL) return -1;
@@ -1057,7 +1067,13 @@ PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self)
}
Py_BEGIN_ALLOW_THREADS;
self->conn = amqp_new_connection();
socket = amqp_tcp_socket_new(self->conn);
if (self->ssl == 1 ) {
socket = amqp_ssl_socket_new(self->conn);
amqp_ssl_socket_set_verify_peer(socket, 0);
amqp_ssl_socket_set_verify_hostname(socket, 0);
} else {
socket = amqp_tcp_socket_new(self->conn);
}
Py_END_ALLOW_THREADS;

if (!socket) {
@@ -1132,14 +1148,22 @@ PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self)
unsigned int
PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel)
{
amqp_rpc_reply_t reply;
amqp_rpc_reply_t replyopen;
amqp_rpc_reply_t replyconfirm;

Py_BEGIN_ALLOW_THREADS;
amqp_channel_open(self->conn, channel);
reply = amqp_get_rpc_reply(self->conn);
replyopen = amqp_get_rpc_reply(self->conn);
if (self->confirmed){
amqp_confirm_select(self->conn, (amqp_channel_t)channel);
replyconfirm = amqp_get_rpc_reply(self->conn);
}
Py_END_ALLOW_THREADS;

return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel");
if ((replyopen.reply_type != AMQP_RESPONSE_NORMAL) || !(self->confirmed)) {
return PyRabbitMQ_HandleAMQError(self, 0, replyopen, "Couldn't create channel");
} else {
return PyRabbitMQ_HandleAMQError(self, 0, replyconfirm, "Couldn't set confirm mode");
}
}


@@ -1811,13 +1835,16 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
PyObject *exchange = NULL;
PyObject *routing_key = NULL;
PyObject *propdict;
amqp_frame_t frame;

unsigned int channel = 0;
unsigned int mandatory = 0;
unsigned int immediate = 0;

char *body_buf = NULL;
Py_ssize_t body_size = 0;

int status = 0;
int ret = 0;
amqp_basic_properties_t props;
amqp_bytes_t bytes;
@@ -1852,21 +1879,27 @@ PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self,
(amqp_boolean_t)immediate,
&props,
bytes);
if (self->confirmed){
status = amqp_simple_wait_frame_on_channel(self->conn,channel,&frame);
}
amqp_maybe_release_buffers_on_channel(self->conn, channel);
Py_END_ALLOW_THREADS;

if (!PyRabbitMQ_HandleError(ret, "basic.publish")) {
goto error;
}
if ((self->confirmed) && (status != AMQP_STATUS_OK) &&
(frame.frame_type != AMQP_FRAME_METHOD) &&
(frame.payload.method.id != AMQP_BASIC_ACK_METHOD )){
goto error;
}
Py_RETURN_NONE;

error:
PyRabbitMQ_revive_channel(self, channel);
bail:
return 0;
}


/*
* Connection._basic_ack
*/
6 changes: 6 additions & 0 deletions Modules/_librabbitmq/connection.h
Original file line number Diff line number Diff line change
@@ -161,6 +161,8 @@ typedef struct {
int frame_max;
int channel_max;
int heartbeat;
int ssl;
int confirmed;

int sockfd;
int connected;
@@ -271,6 +273,10 @@ static PyMemberDef PyRabbitMQ_ConnectionType_members[] = {
offsetof(PyRabbitMQ_Connection, port), READONLY, NULL},
{"heartbeat", T_INT,
offsetof(PyRabbitMQ_Connection, heartbeat), READONLY, NULL},
{"ssl", T_INT,
offsetof(PyRabbitMQ_Connection, ssl), READONLY, NULL},
{"confirmed", T_INT,
offsetof(PyRabbitMQ_Connection, confirmed), READONLY, NULL},
{"server_properties", T_OBJECT_EX,
offsetof(PyRabbitMQ_Connection, server_properties), READONLY, NULL},
{"connected", T_INT,
7 changes: 5 additions & 2 deletions librabbitmq/__init__.py
Original file line number Diff line number Diff line change
@@ -190,14 +190,17 @@ class Connection(_librabbitmq.Connection):

def __init__(self, host='localhost', userid='guest', password='guest',
virtual_host='/', port=5672, channel_max=0xffff,
frame_max=131072, heartbeat=0, lazy=False,
frame_max=131072, heartbeat=0, ssl=False, confirmed=False, lazy=False,
client_properties=None, **kwargs):
if ':' in host:
host, port = host.split(':')
if ssl:
ssl = True
confirmed = confirmed if confirmed else kwargs.pop("confirm_publish",False)
super(Connection, self).__init__(
hostname=host, port=int(port), userid=userid, password=password,
virtual_host=virtual_host, channel_max=channel_max,
frame_max=frame_max, heartbeat=heartbeat,
frame_max=frame_max, heartbeat=heartbeat, ssl=int(ssl),confirmed=int(confirmed),
client_properties=client_properties,
)
self.channels = {}
2 changes: 1 addition & 1 deletion rabbitmq-c
Submodule rabbitmq-c updated 104 files
36 changes: 18 additions & 18 deletions setup.py
Original file line number Diff line number Diff line change
@@ -64,6 +64,9 @@ def append_env(L, e):
'amqp_socket.c',
'amqp_table.c',
'amqp_tcp_socket.c',
'amqp_openssl_hostname_validation.c',
'amqp_openssl.c',
'amqp_openssl_bio.c',
'amqp_time.c',
'amqp_url.c',
])
@@ -72,7 +75,7 @@ def append_env(L, e):

if is_linux: # Issue #42
libs.append('rt') # -lrt for clock_gettime

libs.append('ssl')
librabbitmq_ext = Extension(
'_librabbitmq',
sources=list(PyC_files) + list(librabbit_files),
@@ -96,6 +99,7 @@ def append_env(L, e):
class build(_build):
stdcflags = [
'-DHAVE_CONFIG_H',
'-DENABLE_SSL_SUPPORT=ON',
]
if os.environ.get('PEDANTIC'):
# Python.h breaks -pedantic, so can only use it while developing.
@@ -123,26 +127,22 @@ def run(self):
)

try:

if not os.path.isdir(os.path.join(LRMQDIST(), '.git')):
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'v0.8.0',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))
print('- pull submodule rabbitmq-c...')
if os.path.isfile('Makefile'):
os.system(' '.join([make, 'submodules']))
else:
os.system(' '.join(['git', 'clone', '-b', 'v0.9.0',
'https://github.com/alanxz/rabbitmq-c.git',
'rabbitmq-c']))

os.chdir(LRMQDIST())

if not os.path.isfile('configure'):
print('- autoreconf')
os.system('autoreconf -i')

if not os.path.isfile('config.h'):
print('- configure rabbitmq-c...')
if os.system('/bin/sh configure --disable-tools \
--disable-docs --disable-dependency-tracking'):
return
print('- cmake')
os.system('cmake .')
print(' -build')
os.system('cmake --build .')
finally:
os.environ.update(restore)
finally:
@@ -157,7 +157,7 @@ def run(self):
return librabbitmq_ext, build


def find_make(alt=('gmake', 'gnumake', 'make', 'nmake')):
def find_make(alt=('gmake', 'gnumake', 'make', 'nmake','cmake')):
for path in os.environ['PATH'].split(':'):
for make in (os.path.join(path, m) for m in alt):
if os.path.isfile(make):