AMQP Crystal Client

These examples cover the basics of an AMQP 0-9-1 client for Crystal. You can install your broker locally, or create a hosted broker with CloudAMQP. Just replace the connection URL with the one for your broker.

Sample code can be found at github.

Installation

Add the dependency to your shard.yml:

  
    dependencies:
      amqp-client:
        github: cloudamqp/amqp-client.cr
  
  

Run shards install

Hello World!

In this example we will write two small programs in Crystal; a producer (sender) that sends a single message, and a consumer (receiver) that receives messages and prints them out.

Publisher

Our first program HelloWorld_publisher.cr will send a single message “Hello World!” to a queue named “hello”.

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      q = ch.queue("hello")
      q.publish_confirm "Hello World!"
      puts "Sent: Hello World!"
    end
  end

Consumer

Our second program HelloWorld_consumer.cr will receive messages from the queue named “hello” and print them on the screen.

require "amqp-client"

AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
  c.channel do |ch|
    q = ch.queue("hello")
    puts "Waiting for messages. To exit press CTRL+C"
    q.subscribe(block: true) do |msg|
      puts "Received: #{msg.body_io.to_s}"
    end
  end
end

Work Queues

In this example, we will create a work queue to distribute time-consuming tasks among multiple consumers. Each work will be delivered to exactly one consumer. If we are building up a backlog of work, we can easily scale by adding more consumers.

Publisher

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      ch.queue_declare(name: "hello", durable: true)
      ch.queue_bind("hello", "amq.direct", "hello")
      msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
      ch.basic_publish msg, exchange: "amq.direct", routing_key: "hello", props: AMQP::Client::Properties.new(delivery_mode: 2)
      puts "Sent: #{msg}"
    end
  end

Consumer

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
  c.channel do |ch|
    q = ch.queue("hello")
    ch.prefetch(count: 1)
    puts "Waiting for messages. To exit press CTRL+C"
    q.subscribe(no_ack: false, block: true) do |msg|
      puts "Received: #{msg.body_io.to_s}"
      msg.ack
      sleep msg.body_io.to_s.count('.')
      puts "Done"
    end
  end
  end

Publish/Subscribe

In this example, we will publish a message to be delivered to multiple consumers by using a fanout exchange.

Publisher

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      ch.queue_declare(name: "hello", durable: true)
      ch.queue_bind("hello", "amq.direct", "hello")
      msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
      ch.basic_publish msg, exchange: "amq.direct", routing_key: "hello", props: AMQP::Client::Properties.new(delivery_mode: 2)
      puts "Sent: #{msg}"
    end
  end

Consumer

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      q = ch.queue
      q.bind exchange: "logs", routing_key: ""
      puts "Waiting for logs. To exit press CTRL+C"
      q.subscribe(no_ack: false, block: true) do |msg|
        puts "Received: #{msg.body_io.to_s}"
        msg.ack
        sleep msg.body_io.to_s.count('.')
        puts "Done"
      end
    end
  end

Routing

In this example, we will publish a message to a direct exchange to be delivered to selective consumers by using routing keys. Add what routing key you want for your consumer and then publish messages with any of those routing keys. Publish a message with crystal send.cr routing_key message, for example, crystal send.cr error “This is an error message” and it will be routed to the consumer with that binding.

Publisher

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      e = ch.exchange_declare("direct_logs", type: "direct")
      q = ch.queue
      severity = ARGV.shift || "info"
      q.bind exchange: "direct_logs", routing_key: severity
      msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
      e.publish msg, routing_key: severity
      puts "Sent: #{msg}"
    end
  end

Consumer

  require "amqp-client"

  abort "Usage: [info] [warning] [error]" if ARGV.empty?

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
  c.channel do |ch|
    q = ch.queue
    ARGV.each do |severity|
      q.bind exchange: "direct_logs", routing_key: severity
    end
    puts "Waiting for logs. To exit press CTRL+C"
    q.subscribe(no_ack: false, block: true) do |msg|
      puts "Received: #{msg.body_io.to_s}"
      msg.ack
      puts "Done"
    end
  end
  end

Topics

In this example, we will publish a message to a topic exchange to be delivered to selective consumers based on multiple criteria.

A * (star) can substitute for exactly one word. A # (hash) can substitute for zero or more words.

Add what routing key you want for your consumer with stars and hashes, and then publish messages with any of those routing keys. Publish a message with crystal send.cr routing_key message, for example, crystal send.cr lazy-rabbit “This is a lazy rabbit” and it will be routed to the consumer with that binding.

Sending

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
    c.channel do |ch|
      e = ch.exchange_declare("topic_animals", type: "topic")
      q = ch.queue
      animals = ARGV.shift || "anonymous.info"
      q.bind exchange: "topic_animals", routing_key: animals
      msg = ARGV.empty? ? "Hello World!" : ARGV.join(' ')
      ch.basic_publish msg, exchange: "topic_animals", routing_key: animals
      puts "Sent: #{msg}"
    end
  end

Reciving

  require "amqp-client"

  AMQP::Client.start("amqps://user:password@hostname/vhost") do |c|
  c.channel do |ch|
    q = ch.queue
    ARGV.each do |animals|
      q.bind exchange: "topic_animals", routing_key: animals
    end
    puts "Waiting for logs. To exit press CTRL+C"
    q.subscribe(no_ack: false, block: true) do |msg|
      puts "Received: #{msg.body_io.to_s}"
      msg.ack
      puts "Done"
    end
  end
  end

Ready to take the next steps? Here are some things you should keep in mind:

Managed LavinMQ instance on CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec. You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.