diff --git a/amqp.inc b/amqp.inc index ea275a1..0da1d95 100644 --- a/amqp.inc +++ b/amqp.inc @@ -862,6 +862,8 @@ class AMQPChannel extends AbstractChannel "90,31" => "tx_rollback_ok" ); + protected $defaultCallback = null; + public function __construct($connection, $channel_id=NULL, $auto_decode=true) @@ -883,6 +885,10 @@ class AMQPChannel extends AbstractChannel $this->alerts = array(); $this->callbacks = array(); $this->auto_decode = $auto_decode; + $this->defaultCallback = array('AMQPBehavior', 'ignoreMessage'); + if ($this->debug) { + $this->defaultCallback = array('AMQPBehavior', 'debugMessage'); + } $this->x_open(); } @@ -1303,16 +1309,36 @@ class AMQPChannel extends AbstractChannel /** * end a queue consumer + * + * @param string $consumer_tag + * @param boolean $nowait + * @param callback $cancellingBehavior - defaults to use the consumer callback + * @return null */ - public function basic_cancel($consumer_tag, $nowait=false) + public function basic_cancel($consumer_tag, $nowait=false, $cancellingBehavior = null) { $args = new AMQPWriter(); $args->write_shortstr($consumer_tag); $args->write_bit($nowait); $this->send_method_frame(array(60, 30), $args); - return $this->wait(array( - "60,31" // Channel.basic_cancel_ok + if ($cancellingBehavior !== null) { + $this->callbacks[$consumer_tag] = $cancellingBehavior; + } + if (!$nowait) { + // ensure we have a callback set + if (!isset($this->callbacks[$consumer_tag])) { + $this->callbacks[$consumer_tag] = $this->defaultCallback; + } + + // Wait for a cancel message before actually breaking out of the waiting loop. + while (array_key_exists($consumer_tag, $this->callbacks)) { + $this->wait(array( + "60,60", // Channel.basic_deliver + "60,31" // Channel.basic_cancel_ok )); + } + } + return null; } /** @@ -1384,7 +1410,7 @@ class AMQPChannel extends AbstractChannel if(array_key_exists($consumer_tag, $this->callbacks)) $func = $this->callbacks[$consumer_tag]; else - $func = NULL; + $func = $this->defaultCallback; if($func!=NULL) call_user_func($func, $msg); @@ -1571,7 +1597,16 @@ class AMQPChannel extends AbstractChannel protected function tx_select_ok($args) { } - + + /** + * Set the default callback to use, if there is no matching consumer + * + * @param callback $defaultCallBack + */ + public function setDefaultCallback($defaultCallBack) + { + $this->defaultCallback = $defaultCallBack; + } } /** @@ -1604,4 +1639,53 @@ class AMQPMessage extends GenericContent } } +/** + * A static class to use with the default callbacks of a channel + * + */ +class AMQPBehavior { + /** + * Reject a message + * @param AMQPMessage $msg + * @param boolean $requeue defaults to true + */ + public static function rejectMessage($msg, $requeue = true) + { + $deliveryTag = $msg->delivery_info['delivery_tag']; + $channel = $msg->delivery_info['channel']; + $channel->basic_reject($deliveryTag, $requeue); + } + + /** + * Drop and Reject a message + * @param AMQPMessage $msg + */ + public static function dropMessage($msg) + { + self::rejectMessage($msg, false); + } + + /** + * Ignore a message + * @param AMQPMessage $msg + */ + public static function ignoreMessage($msg) + { + // no-op + } + + /** + * send message to debugging messaging + * @param AMQPMessage $msg + */ + public static function debugMessage($msg) + { + $deliveryTag = $msg->delivery_info['delivery_tag']; + $consumerTag = $msg->delivery_info['consumer_tag']; + $channel = $msg->delivery_info['channel']; + $channel_id = $channel->getChannelId(); + + debug_msg("Message {$deliveryTag} on channel_id {$channel_id} with consumer-tag {$consumerTag}"); + } +} ?> diff --git a/amqp_wire.inc b/amqp_wire.inc index 5e489e3..dcf4a15 100644 --- a/amqp_wire.inc +++ b/amqp_wire.inc @@ -302,6 +302,10 @@ class AMQPReader while($read < $n && !feof($this->sock->real_sock()) && (false !== ($buf = fread($this->sock->real_sock(), $n - $read)))) { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } + if ($buf == '') { usleep(100);