Как заставить работать Swoole сокет сервер в связке с RabbitMQ?

Привет. Наигравшись с RabbitMQ и Swoole по отдельности, решил их объединить. Суть задачи:
1) Php приложение отдает сообщение RabbitMQ воркеру
2) RabbitMQ воркер принимает сообщение и транслирует его всем существующим сокет соединениям

Для начала создал Swoole сокет сервер который слушает соединения (swoole_sever.php). Запускаю его в одном окне терминала.
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
    echo "received message: {$frame->data}\n";
    $server->push($frame->fd, json_encode(["hello", "world"]));
});

$server->on('close', function($server, $fd)
{
    echo "connection close: {$fd}\n";
});

$server->start();


Далее, в другом окне терминала запускается RabbitMQ воркер, который ждет сообщения (worker.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);


    // Здесь я пытаюсь подключиться к сокету и отправить сообщение
    $cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();


Создаю новую задачу в которой будет отправляться сообщение RabbitMQ воркеру (task.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();


И наконец запускаю задачу
php new_task.php

В итоге задача успешно доходит до воркера, тот успешно отрабатывает функцию обратного вызова
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

А вот в Swoole сокет сервер ничего не приходит. Даже в терминале не видно сообщения "connection open". Однако, если после этого остановить воркер в терминале, то в окне где работает сокет сервер срабатывает функция
$server->on('close', function($server, $fd)

Как такое вообще возможно? Уже 2ой день пытаюсь разобраться :(
  • Вопрос задан
  • 316 просмотров
Решения вопроса 1
@acerrusm Автор вопроса
Есть новости по поводу Swoole + RabbitMQ. Добился от разработчика хоть какого то пояснения ТУТ. Но опять же, пример кода который он выложил у меня не работает. Пробовал сначала на виртуалке, потом на простом ubuntu сервере, но тщетно. Хотя картина более или менее проясняется.

Написал разработчику о проблеме, но он пока молчит. Потому хочу попросить тех, кто заинтересован, попробовать у себя запустить swoole вместе c rabbitmq. Вдруг все таки я криворук и пример кода рабочий. Если вы столкнетесь с такой же проблемой, то обязательно напишите об этом на гитхабе.

Шаг 1.
Установите Swoole
Если будете устанавливать Swoole, то устанавливайте с помощью команды:
sudo pecl install swoole-2.1.1

Не рекомендую устанавливать компилируя, иначе потом запаритесь удалять т.к. uninstaller не прилагается и команда "make uninstall" не сработает.

Обязательно должна быть версия 2.1.1. т.к. в версиях ниже нет coroutine.

Шаг 2:
Установите IDE-helper, что бы было приятнее работать с методами и классами

Шаг 3:
Установите phpAMQP. Это репозиторий который swoole форкнул себе и добавил поддержку Swoole. Т.е. команда
composer require php-amqplib/php-amqplib
установит только phpAMQP БЕЗ поддержки Swoole и вам нужно будет ручками добавить 2 файлика:
1) php-amqplib/PhpAmqpLib/Connection/AMQPSwooleConnection.php
2) php-amqplib/PhpAmqpLib/Wire/IO/SwooleIO.php

Шаг 4:
Установите RabbitMQ

Удачи!

UPDATE: на StackOverflow помогли с решением: https://stackoverflow.com/questions/49226659/swool...

Проблема была в том, что в worker.php я использовал экземпляр класса swoole_http_client, который как оказалось работает асинхронно.

Для синхронной работы потребуется WebSocketClient класс, который можно найти тут

Далее заменить
$cli = new \swoole_http_client('0.0.0.0', 2345);

    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });

    $cli->upgrade('/', function($cli)
    {
        $cli->push('This is the message to send to Swoole server');
        $cli->close();
    });

на
$client = new WebSocketClient('0.0.0.0', 2345);
$client->connect();
$client->send('This is the message to send to Swoole server');
$recv = $client->recv();
print_r($recv);
$client->close();


Все.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через TM ID
Похожие вопросы
Skyeng Москва
До 180 000 руб.
Dropwow Москва
от 160 000 до 200 000 руб.
Skyeng Москва
До 160 000 руб.
14 авг. 2018, в 19:03
10000 руб./за проект
14 авг. 2018, в 18:10
1000 руб./в час
14 авг. 2018, в 18:01
12000 руб./за проект