August 12, 2024

RabbitMQ Routing and Topic Exchange Patterns with PHP

Routing Pattern (Direct Exchange)

Simplified Summary

The routing pattern allows you to send messages to specific consumers based on a routing key. Each consumer can be configured to listen to multiple routing keys.

Problem Solved

Instead of broadcasting to all consumers, you can send messages only to some of them.

Producer Code – Only Consumer 1 Receives the Message

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

// Declare a direct exchange
$channel->exchange_declare('direct_test', 'direct', false, false, false);

$msg = new AMQPMessage('Routing test message');
// Send to exchange with routing key 'consumer1'
$channel->basic_publish($msg, 'direct_test', 'consumer1');

$channel->close();
$connection->close();

Consumer 1 Code – Listens to consumer1

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

// Declare exchange
$channel->exchange_declare('direct_test', 'direct', false, false, false);

// Create a temporary queue
$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];

// Bind queue to the exchange with routing key 'consumer1'
$channel->queue_bind($queue_name, 'direct_test', 'consumer1');

$callback = function ($msg) {
    echo $msg->getBody();
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

Consumer 2 Code – Listens to consumer2 (Will Not Receive Message)

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

$channel->exchange_declare('direct_test', 'direct', false, false, false);

$queue_info = $channel->queue_declare("", false, false, true, false);
$queue_name = $queue_info[0];

$channel->queue_bind($queue_name, 'direct_test', 'consumer2');

$callback = function ($msg) {
    echo $msg->getBody();
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

$channel->consume();

$channel->close();
$connection->close();

Topic Pattern (Topic Exchange)

Simplified Summary

The topic pattern enables producers to define message routing rules using pattern matching. Only queues with matching routing keys will receive the messages.

Problem Solved

Provides a more flexible message distribution mechanism than fanout or direct modes.

Notes

  • Routing keys are structured as dot-separated identifiers, e.g., aaa.bbb.ccc.
  • The maximum length is 255 characters.
  • Wildcards are supported:
    • * matches exactly one word.
    • # matches zero or more words.
    • (Yes, this design might seem reversed or counterintuitive.)

Examples:

  • *.*.xxx matches keys that end with .xxx and have exactly 3 segments.
  • *.xxx.* matches keys that include xxx in the middle.
  • a.# matches a, a.b, a.b.c, etc.
  • A binding key of # acts like fanout mode.
  • A key with no wildcards behaves like direct mode.

Producer Code

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

// Declare topic exchange
$channel->exchange_declare('topic_test', 'topic', false, false, false);

$msg = new AMQPMessage('Topic test message');

// Routing keys to publish to
$routingKeys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z', 'x', 'y', 'z', 'a', 'ab', 'ac', 'ad', 'a.x.y.z', 'abc.z'];

foreach ($routingKeys as $key) {
    $channel->basic_publish($msg, 'topic_logs', $key);
}

$channel->close();
$connection->close();

Consumer Code

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.0.180', 5672, 'admin', '12345678');
$channel = $connection->channel();

// Declare topic exchange
$channel->exchange_declare('topic_test', 'topic', false, false, false);

// Create temporary queue
$queue_name = $channel->queue_declare("", false, false, true, false)[0];

// Bind multiple routing patterns
$binding_keys = ['a.b.c', 'aa.bb.cc', 'a.b.c.d', 'a.b', 'a.*.*', 'a.*.*.*', '#.z'];
foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

$callback = function ($msg) {
    echo 'RoutingKey: ', $msg->getRoutingKey(), ' --- Msg: ', $msg->getBody(), "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

try {
    $channel->consume();
} catch (\Throwable $exception) {
    echo $exception->getMessage();
}

$channel->close();
$connection->close();

Conclusion

RabbitMQ offers versatile message routing strategies:

  • Direct exchanges use explicit routing keys to deliver messages to designated queues.
  • Topic exchanges extend this with wildcard-based routing, enabling complex and flexible matching.

These patterns help fine-tune how messages are distributed across consumers in real-world applications like logging, notifications, microservices, or any distributed system architecture.