Skip to content

Commit

Permalink
Merge pull request #6 from yuanxing008/dev
Browse files Browse the repository at this point in the history
Fix Bug
  • Loading branch information
yuanxing008 authored May 12, 2020
2 parents 3074ad4 + 37715ba commit 961deda
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ protected function getConnection(): AbstractConnection
*/
private function createConnectionByType($type)
{
$connectionDetails = $this->connectionDetails;
if (array_key_exists('access_key', $connectionDetails)) {
$this->accessKey = $connectionDetails['access_key'];
$this->accessSecret = $connectionDetails['access_secret'];
$this->resourceOwnerId = $connectionDetails['resource_owner_id'];
if ($connectionDetails['access_key'] != ''
&& $connectionDetails['access_secret'] != ''
&& $connectionDetails['resource_owner_id'] != ''
) {
$this->connectionDetails['username'] = $this->getUser();
$this->connectionDetails['password'] = $this->getPassword();
}
}
return new $type(
$this->connectionDetails['hostname'],
$this->connectionDetails['port'],
Expand Down
9 changes: 5 additions & 4 deletions src/Entity/QueueEntity.php
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ public function startConsuming(int $messages, int $seconds, int $maxMemory)
$this->setupConsumer($messages, $seconds, $maxMemory);
while (false === $this->shouldStopConsuming()) {
try {
$this->getChannel()->wait(null, false, 1);
$this->getChannel()->wait();
} catch (AMQPTimeoutException $e) {
$this->logger->debug('startConsuming Exception--->'.$e->getMessage());
usleep(1000);
$this->getConnection()->reconnect();
$this->setupChannelConsumer();
Expand All @@ -323,7 +324,7 @@ protected function shouldStopConsuming(): bool
{
if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
$this->logger->debug(
"Stopped consumer",
"Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
[
'limit' => 'time_limit',
'value' => sprintf("%.2f", microtime(true) - $this->startTime)
Expand All @@ -333,7 +334,7 @@ protected function shouldStopConsuming(): bool
}
if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
$this->logger->debug(
"Stopped consumer",
"Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
[
'limit' => 'memory_limit',
'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
Expand All @@ -344,7 +345,7 @@ protected function shouldStopConsuming(): bool

if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
$this->logger->debug(
"Stopped consumer",
"Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
);
return true;
Expand Down

0 comments on commit 961deda

Please sign in to comment.