Skip to content

A possible fix for issue #5 regarding basic_cancel timing #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
94 changes: 89 additions & 5 deletions amqp.inc
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,8 @@ class AMQPChannel extends AbstractChannel
"90,31" => "tx_rollback_ok"
);

protected $defaultCallback = null;

public function __construct($connection,
$channel_id=NULL,
$auto_decode=true)
Expand All @@ -883,6 +885,10 @@ class AMQPChannel extends AbstractChannel
$this->alerts = array();
$this->callbacks = array();
$this->auto_decode = $auto_decode;
$this->defaultCallback = array('AMQPBehavior', 'ignoreMessage');
if ($this->debug) {
$this->defaultCallback = array('AMQPBehavior', 'debugMessage');
}

$this->x_open();
}
Expand Down Expand Up @@ -1303,16 +1309,36 @@ class AMQPChannel extends AbstractChannel

/**
* end a queue consumer
*
* @param string $consumer_tag
* @param boolean $nowait
* @param callback $cancellingBehavior - defaults to use the consumer callback
* @return null
*/
public function basic_cancel($consumer_tag, $nowait=false)
public function basic_cancel($consumer_tag, $nowait=false, $cancellingBehavior = null)
{
$args = new AMQPWriter();
$args->write_shortstr($consumer_tag);
$args->write_bit($nowait);
$this->send_method_frame(array(60, 30), $args);
return $this->wait(array(
"60,31" // Channel.basic_cancel_ok
if ($cancellingBehavior !== null) {
$this->callbacks[$consumer_tag] = $cancellingBehavior;
}
if (!$nowait) {
// ensure we have a callback set
if (!isset($this->callbacks[$consumer_tag])) {
$this->callbacks[$consumer_tag] = $this->defaultCallback;
}

// Wait for a cancel message before actually breaking out of the waiting loop.
while (array_key_exists($consumer_tag, $this->callbacks)) {
$this->wait(array(
"60,60", // Channel.basic_deliver
"60,31" // Channel.basic_cancel_ok
));
}
}
return null;
}

/**
Expand Down Expand Up @@ -1384,7 +1410,7 @@ class AMQPChannel extends AbstractChannel
if(array_key_exists($consumer_tag, $this->callbacks))
$func = $this->callbacks[$consumer_tag];
else
$func = NULL;
$func = $this->defaultCallback;

if($func!=NULL)
call_user_func($func, $msg);
Expand Down Expand Up @@ -1571,7 +1597,16 @@ class AMQPChannel extends AbstractChannel
protected function tx_select_ok($args)
{
}


/**
* Set the default callback to use, if there is no matching consumer
*
* @param callback $defaultCallBack
*/
public function setDefaultCallback($defaultCallBack)
{
$this->defaultCallback = $defaultCallBack;
}
}

/**
Expand Down Expand Up @@ -1604,4 +1639,53 @@ class AMQPMessage extends GenericContent
}
}

/**
* A static class to use with the default callbacks of a channel
*
*/
class AMQPBehavior {
/**
* Reject a message
* @param AMQPMessage $msg
* @param boolean $requeue defaults to true
*/
public static function rejectMessage($msg, $requeue = true)
{
$deliveryTag = $msg->delivery_info['delivery_tag'];
$channel = $msg->delivery_info['channel'];
$channel->basic_reject($deliveryTag, $requeue);
}

/**
* Drop and Reject a message
* @param AMQPMessage $msg
*/
public static function dropMessage($msg)
{
self::rejectMessage($msg, false);
}

/**
* Ignore a message
* @param AMQPMessage $msg
*/
public static function ignoreMessage($msg)
{
// no-op
}

/**
* send message to debugging messaging
* @param AMQPMessage $msg
*/
public static function debugMessage($msg)
{
$deliveryTag = $msg->delivery_info['delivery_tag'];
$consumerTag = $msg->delivery_info['consumer_tag'];
$channel = $msg->delivery_info['channel'];
$channel_id = $channel->getChannelId();

debug_msg("Message {$deliveryTag} on channel_id {$channel_id} with consumer-tag {$consumerTag}");
}
}
?>
4 changes: 4 additions & 0 deletions amqp_wire.inc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ class AMQPReader
while($read < $n && !feof($this->sock->real_sock()) &&
(false !== ($buf = fread($this->sock->real_sock(), $n - $read))))
{
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}

if ($buf == '')
{
usleep(100);
Expand Down