Introduction
Configuration
- Publisher Confirms
- Import/Export definitions
- Consumer Cancellation
- Configuration files
- Consumer Acknowledgments
- Prefetch
- Policies
Sample Code
Features
- Dead Letter Exchange
- Consistent Hash Exchange
- Alternate Exchange
- Shovel
- Federation
- RPC
- Direct Reply-to
- Delayed Message Exchange
- Pause Consumers
AMQP 0-9-1 Overview
Queue deep-dive
LavinMQ CLI
Management Interface
Management HTTP API
Tutorials
Security
Monitoring
Development
Support
AMQP Crystal Client
These examples cover the basics of an AMQP 0-9-1 client for Crystal. You can install your broker locally, or create a hosted broker with CloudAMQP. Just replace the connection URL with the one for your broker.
Sample code can be found at github.
Installation
Add the dependency to your shard.yml
:
dependencies:
amqp-client:
github: cloudamqp/amqp-client.cr
Run shards install
Hello World!
In this example we will write two small programs in Crystal; a producer (sender) that sends a single message, and a consumer (receiver) that receives messages and prints them out.
Publisher
Our first program HelloWorld_publisher.cr
will send a single message “Hello World!” to a queue named “hello”.
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue("hello")
q.publish_confirm "Hello World!"
puts "Sent: Hello World!"
end
end
Consumer
Our second program HelloWorld_consumer.cr
will receive messages from the queue named “hello” and print them on the screen.
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue("hello")
puts "Waiting for messages. To exit press CTRL+C"
q.subscribe(block: true) do |msg|
puts "Received: #{msg.body_io.to_s}"
end
end
end
Work Queues
In this example, we will create a work queue to distribute time-consuming tasks among multiple consumers. Each work will be delivered to exactly one consumer. If we are building up a backlog of work, we can easily scale by adding more consumers.
Publisher
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
ch.queue_declare(name: "hello", durable: true)
ch.queue_bind("hello", "amq.direct", "hello")
msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
ch.basic_publish msg, exchange: "amq.direct", routing_key: "hello", props: AMQP::Client::Properties.new(delivery_mode: 2)
puts "Sent: #{msg}"
end
end
Consumer
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue("hello")
ch.prefetch(count: 1)
puts "Waiting for messages. To exit press CTRL+C"
q.subscribe(no_ack: false, block: true) do |msg|
puts "Received: #{msg.body_io.to_s}"
msg.ack
sleep msg.body_io.to_s.count('.')
puts "Done"
end
end
end
Publish/Subscribe
In this example, we will publish a message to be delivered to multiple consumers by using a fanout exchange.
Publisher
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
ch.queue_declare(name: "hello", durable: true)
ch.queue_bind("hello", "amq.direct", "hello")
msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
ch.basic_publish msg, exchange: "amq.direct", routing_key: "hello", props: AMQP::Client::Properties.new(delivery_mode: 2)
puts "Sent: #{msg}"
end
end
Consumer
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue
q.bind exchange: "logs", routing_key: ""
puts "Waiting for logs. To exit press CTRL+C"
q.subscribe(no_ack: false, block: true) do |msg|
puts "Received: #{msg.body_io.to_s}"
msg.ack
sleep msg.body_io.to_s.count('.')
puts "Done"
end
end
end
Routing
In this example, we will publish a message to a direct exchange to be delivered to selective consumers by using routing keys. Add what routing key you want for your consumer and then publish messages with any of those routing keys. Publish a message with crystal send.cr routing_key message, for example, crystal send.cr error “This is an error message” and it will be routed to the consumer with that binding.
Publisher
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
e = ch.exchange_declare("direct_logs", type: "direct")
q = ch.queue
severity = ARGV.shift || "info"
q.bind exchange: "direct_logs", routing_key: severity
msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
e.publish msg, routing_key: severity
puts "Sent: #{msg}"
end
end
Consumer
require "amqp-client"
abort "Usage: [info] [warning] [error]" if ARGV.empty?
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue
ARGV.each do |severity|
q.bind exchange: "direct_logs", routing_key: severity
end
puts "Waiting for logs. To exit press CTRL+C"
q.subscribe(no_ack: false, block: true) do |msg|
puts "Received: #{msg.body_io.to_s}"
msg.ack
puts "Done"
end
end
end
Topics
In this example, we will publish a message to a topic exchange to be delivered to selective consumers based on multiple criteria.
A *
(star) can substitute for exactly one word.
A #
(hash) can substitute for zero or more words.
Add what routing key you want for your consumer with stars and hashes, and then publish messages with any of those routing keys. Publish a message with crystal send.cr routing_key message, for example, crystal send.cr lazy-rabbit “This is a lazy rabbit” and it will be routed to the consumer with that binding.
Sending
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
e = ch.exchange_declare("topic_animals", type: "topic")
q = ch.queue
animals = ARGV.shift || "anonymous.info"
q.bind exchange: "topic_animals", routing_key: animals
msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
ch.basic_publish msg, exchange: "topic_animals", routing_key: animals
puts "Sent: #{msg}"
end
end
Reciving
require "amqp-client"
AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
c.channel do |ch|
q = ch.queue
ARGV.each do |animals|
q.bind exchange: "topic_animals", routing_key: animals
end
puts "Waiting for logs. To exit press CTRL+C"
q.subscribe(no_ack: false, block: true) do |msg|
puts "Received: #{msg.body_io.to_s}"
msg.ack
puts "Done"
end
end
end