Skip to content

Commit

Permalink
Merge pull request #7 from yuanxing008/dev
Browse files Browse the repository at this point in the history
Fix Publisher Reconnection
  • Loading branch information
yuanxing008 authored Jun 22, 2020
2 parents 961deda + 38cf13a commit a16012e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
40 changes: 36 additions & 4 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

/**
* Class AMQPConnection
*
* @package JokerProject\LaravelAliyunAmqp
*/
class AMQPConnection
class AMQPConnection implements LoggerAwareInterface
{
use LoggerAwareTrait;

/**
* @const array Default connections parameters
*/
Expand Down Expand Up @@ -201,9 +207,35 @@ private function createConnectionByType($type)
*/
public function reconnect()
{
$this->getConnection()->channel()->close();
$this->channel = null;
$this->getConnection()->reconnect();
try {
if (!$this->connection->isConnected()) {
$this->connection = $this->getConnection();
}
if ($this->connection->channel()->is_open()) {
$this->connection->channel()->close();
}
$this->channel = null;
$this->getConnection()->reconnect();
} catch (AMQPChannelClosedException $e) {
$this->logger->info('channel was closed');
} catch (AMQPConnectionClosedException $e){
$this->getConnection()->reconnect();
$this->logger->info('connection was closed');
}
}

public function close()
{
if ($this->channel->is_open()) {
$this->channel->close();
}
if ($this->connection->isConnected()) {
try {
$this->connection->close();
} catch (\Exception $exception) {
throw $exception;
}
}
}

/**
Expand Down
19 changes: 12 additions & 7 deletions src/Entity/QueueEntity.php
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,25 @@ public function publish(string $message, string $routingKey = '')
}

try {
$this->getChannel()
->basic_publish(
new AMQPMessage($message),
'',
$this->attributes['name'],
true
);
if (!$this->getChannel()->is_open()) {
$this->getConnection()->reconnect();
}
$channel = $this->getChannel();
$channel->basic_publish(
new AMQPMessage($message),
'',
$this->attributes['name'],
true
);
$this->retryCount = 0;
$this->connection->close();
} catch (AMQPChannelClosedException $exception) {
$this->retryCount++;
// Retry publishing with re-connect
if ($this->retryCount < self::MAX_RETRIES) {
$this->getConnection()->reconnect();
$this->publish($message, $routingKey);
$this->connection->close();
return;
}
throw $exception;
Expand Down

0 comments on commit a16012e

Please sign in to comment.