使用php开发rabbitmq示例

时间:2019-01-14
本文章向大家介绍使用php开发rabbitmq示例,主要包括使用php开发rabbitmq示例使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

关于rabbitmq的anz安装,可通过资料查阅自行安装
可参考 https://blog.csdn.net/helloworld_dream/article/details/83501480

主要实现通过rabbitmq发送电子邮件的一个示例,并且说一下官网给的demo中 每行代码的讲解。建议先去官网实根据文档操作一遍

使用RabbitMQ

完成RabbitMQ的安装后,我们现在可以安装PHP的AMQP库,它实现了高级消息队列协议。首先创建一个新目录,我们将把所有文件放在测试RabbitMQ的位置。接下来,创建一个composer.json文件并添加以下内容:

{
  "require": {
      "videlalvaro/php-amqplib": "2.2.*"
  }
}

打开终端并 cd 进入先前创建的目录,然后执行 composer install 以安装AMQP库。
在我们继续之前,还可以安装 Swiftmailer。您可以通过从终端执行以下命令来执行此操作。这也为你的composer.json添加了Swiftmailer的一个条目:

composer require swiftmailer/swiftmailer @stable

我们将使用Swiftmailer作为我们正在创建的示例应用程序,它将向我们想象中的用户发送电子邮件。通常,电子邮件在发送之前需要几秒钟。添加到电子邮件的附件也增加了那个时间。在现实世界中,我们并不希望用户等待。我们想要做的是让他们相信我们已经通过显示已发送的消息发送了电子邮件。

这就是RabbitMQ的用武之地。我们将它用作某种邮箱,多个用户可以放弃他们的邮件。然后RabbitMQ将负责在后台发送邮件。

发送消息

首先,让我们创建用于发送电子邮件的表单。它将接受发件人的姓名和电子邮件地址,接收者的电子邮件地址,然后是主题和消息。命名文件 form.php

<?php
if(!empty($_GET['sent'])){
?>
<div>
    Your message was sent!
</div>
<?php
}
?>
<form action="mailer.php" method="POST">
    <div>
        <label for="from">From</label>
        <input type="text" name="from" id="from">       
    </div>
    <div>
        <label for="from_email">From Email</label>
        <input type="text" name="from_email" id="from_email">       
    </div>
    <div>
        <label for="to_email">To Email</label>
        <input type="text" name="to_email" id="to_email">           
    </div>
    <div>
        <label for="subject">Subject</label>
        <input type="text" name="subject" id="subject">
    </div>
    <div>
        <label for="message">Message</label>
        <textarea name="message" id="message" cols="30" rows="10"></textarea>   
    </div>
    <div>
        <button type="submit">Send</button>
    </div>
</form>

接下来,创建将消息推送到队列中的文件,并将其命名为 sender.php
需要引入该 autoload.php 文件,以便PHP自动加载我们的依赖项。然后使用 AMQP库中的 AMQPConnectionAMQPMessageAMQPConnection 允许我们创建一个到RabbitMQ服务器的新连接,并且 AMQPMessage 允许我们创建可以推送到队列的消息。

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
 
$channel->queue_declare('email_queue', false, false, false, false);
 
$data = json_encode($_POST);
 
$msg = new AMQPMessage($data, array('delivery_mode' => 2));
$channel->basic_publish($msg, '', 'email_queue');
 
header('Location: form.php?sent=true');

打破它,首先我们通过创建一个新的AMQPConnection 类实例来创建一个新的连接 。这需要以下参数:

  • host - 运行RabbitMQ服务器的主机。在这种情况下,我们已经在我们运行脚本的同一台计算机上安装了RabbitMQ。所以它应该是 localhost。请注意,在现实世界中,我们将在另一台服务器上安装RabbitMQ,与用于服务我们网站的服务器不同。因此, localhost 我们不使用该服务器的公共IP地址。
  • port - 运行RabbitMQ服务器的端口。
  • user - 登录服务器的用户名。默认情况下,用户名设置为 guest。
  • password - 登录服务器的密码。默认情况下,密码设置为 guest。

现在,我们将创建一个channel。我们可以通过channel() 从我们刚刚声明的连接中调用方法来做到这 一点。

$channel = $connection->channel();

然后我们将通过调用queue_declare 方法声明要使用的队列 。

$channel->queue_declare('email_queue', false, false, false, false);

该 queue_declare 方法采用以下参数:

  • queue_name- 要用于队列的名称。你可以为此提供任何东西。
  • passive - 一个布尔值,用于指定是否检查现有交换。
  • durable - 一个布尔值,用于指定RabbitMQ在服务器崩溃时是否保留队列。
  • exclusive - 一个布尔值,用于指定队列是否仅由一个连接使用。
  • auto-delete - 一个布尔值,用于指定在最后一个订阅者取消订阅时是否删除队列。

接下来,我们将从表单接收的POST数据转换为JSON字符串。我们只能将字符串作为消息传递,因此我们以后必须将其转换为接收端的数组。

$data = json_encode($_POST);

接下来我们创建一条新消息。这接受2个参数:数据和数组。对于数组,我们指定 delivery_mode = 2,这意味着消息是持久的。这意味着当服务器崩溃或发生错误时它不会丢失。

$msg = new AMQPMessage($data, array('delivery_mode' => 2));

接下来,我们将通过调用basic_publish() 通道上的方法来发布消息 。这接受3个参数:message,exchange 和 queue。如果你想知道为什么我们将exchange设置为空字符串,那是因为我们并不真的需要它。交换通常用于pub-sub模式。这里使用的只是基本的发布。

$channel->basic_publish($msg, '', 'email_queue');

最终,我们重定向到表单。

header('Location: form.php?sent=true');

接收消息 (Receiving Messages)

现在我们准备编写将接收用户发送的消息的代码。命名文件 receiver.php。这是文件的全部内容:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
 
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
 
$channel->queue_declare('email_queue', false, false, false, false);
 
echo ' * Waiting for messages. To exit press CTRL+C', "\n";
 
$callback = function($msg){
 
    echo " * Message received", "\n";
    $data = json_decode($msg->body, true);
 
    $from = $data['from'];
    $from_email = $data['from_email'];
    $to_email = $data['to_email'];
    $subject = $data['subject'];
    $message = $data['message'];
 
    $transporter = Swift_SmtpTransport::newInstance('smtp.gmail.com', 465, 'ssl')
      ->setUsername('YOUR_GMAIL_EMAIL')
      ->setPassword('YOUR_GMAIL_PASSWORD');
 
    $mailer = Swift_Mailer::newInstance($transporter);  
 
    $message = Swift_Message::newInstance($transporter)
        ->setSubject($subject)
        ->setFrom(array($from_email => $from))
        ->setTo(array($to_email))
        ->setBody($message);
 
    $mailer->send($message);
 
    echo " * Message was sent", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
 
$channel->basic_qos(null, 1, null);
$channel->basic_consume('email_queue', '', false, false, false, false, $callback);
 
while(count($channel->callbacks)) {
    $channel->wait();
}

前5行代码基本上与我们在sender.php 文件上的代码相同 。然后我们只输出一条消息,告诉我们如何阻止文件运行。我们需要从终端运行这个文件,停止运行 键入 CTRL+C
现在我们将声明一个用于处理我们从发送方传递的消息的匿名函数。它做的第一件事就是收到消息的输出。然后我们使用 json_decode() 将JSON字符串转换回数组。

$callback = function($msg){
    echo " * Message received", "\n";
    $data = json_decode($msg->body, true);
};

中间那段代码是处理邮件发送的逻辑,先获取参数然后发送邮件

最后,我们发送消息发送的消息和输出内容。最后一行基本上告诉RabbitMQ消息的发送确实成功了。

$mailer->send($message);
echo " * Message was sent", "\n";
 
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

您现在可以通过转到终端并执行以下命令来运行 消费者方代码:

php receiver.php

运行后,转到浏览器并访问该 sender.php 文件。输入邮件的详细信息,然后单击“发送”。你立即受到’你的消息被发送’的文本,但如果您立即检查您的电子邮件帐户,它还没有。如果它不在那里,那么队列仍在处理它。检查执行接收器的终端窗口上显示的输出。如果已发送该电子邮件,您应该会看到“已发送邮件”。