Как заставить работать Swoole сокет сервер в связке с RabbitMQ?

Привет. Наигравшись с RabbitMQ и Swoole по отдельности, решил их объединить. Суть задачи:
1) Php приложение отдает сообщение RabbitMQ воркеру
2) RabbitMQ воркер принимает сообщение и транслирует его всем существующим сокет соединениям

Для начала создал Swoole сокет сервер который слушает соединения (swoole_sever.php). Запускаю его в одном окне терминала.
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();


Далее, в другом окне терминала запускается RabbitMQ воркер, который ждет сообщения (worker.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Здесь я пытаюсь подключиться к сокету и отправить сообщение
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();


Создаю новую задачу в которой будет отправляться сообщение RabbitMQ воркеру (task.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();


И наконец запускаю задачу
php new_task.php

В итоге задача успешно доходит до воркера, тот успешно отрабатывает функцию обратного вызова
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

А вот в Swoole сокет сервер ничего не приходит. Даже в терминале не видно сообщения "connection open". Однако, если после этого остановить воркер в терминале, то в окне где работает сокет сервер срабатывает функция
$server->on('close', function($server, $fd)

Как такое вообще возможно? Уже 2ой день пытаюсь разобраться :(
  • Вопрос задан
  • 1618 просмотров
Решения вопроса 1
@acerrusm Автор вопроса
Есть новости по поводу Swoole + RabbitMQ. Добился от разработчика хоть какого то пояснения ТУТ. Но опять же, пример кода который он выложил у меня не работает. Пробовал сначала на виртуалке, потом на простом ubuntu сервере, но тщетно. Хотя картина более или менее проясняется.

Написал разработчику о проблеме, но он пока молчит. Потому хочу попросить тех, кто заинтересован, попробовать у себя запустить swoole вместе c rabbitmq. Вдруг все таки я криворук и пример кода рабочий. Если вы столкнетесь с такой же проблемой, то обязательно напишите об этом на гитхабе.

Шаг 1.
Установите Swoole
Если будете устанавливать Swoole, то устанавливайте с помощью команды:
sudo pecl install swoole-2.1.1

Не рекомендую устанавливать компилируя, иначе потом запаритесь удалять т.к. uninstaller не прилагается и команда "make uninstall" не сработает.

Обязательно должна быть версия 2.1.1. т.к. в версиях ниже нет coroutine.

Шаг 2:
Установите IDE-helper, что бы было приятнее работать с методами и классами

Шаг 3:
Установите phpAMQP. Это репозиторий который swoole форкнул себе и добавил поддержку Swoole. Т.е. команда
composer require php-amqplib/php-amqplib
установит только phpAMQP БЕЗ поддержки Swoole и вам нужно будет ручками добавить 2 файлика:
1) php-amqplib/PhpAmqpLib/Connection/AMQPSwooleConnection.php
2) php-amqplib/PhpAmqpLib/Wire/IO/SwooleIO.php

Шаг 4:
Установите RabbitMQ

Удачи!

UPDATE: на StackOverflow помогли с решением: https://stackoverflow.com/questions/49226659/swool...

Проблема была в том, что в worker.php я использовал экземпляр класса swoole_http_client, который как оказалось работает асинхронно.

Для синхронной работы потребуется WebSocketClient класс, который можно найти тут

Далее заменить
$cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });

на
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();


Все.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы