Tutorials

Geographic filtering with Streams

LavinMQ 2.6.0 introduces geographic filtering, which allows consumers to filter messages based on location coordinates. This is particularly useful for location-based services like delivery tracking, ride-sharing platforms, or real-time asset monitoring where you only want to process events within specific geographic areas.

For example, imagine a delivery tracking system where dispatch centers only need to see deliveries in their service area. Instead of receiving all delivery events globally and filtering them locally, each dispatch center can subscribe to deliveries within their geographic region directly.

If you are new to LavinMQ Streams, start with the LavinMQ Streams guide for the basics of declaring and consuming stream queues.

Filter types

LavinMQ supports three types of geographic filters:

  • geo-within-radius: Filters messages within a specified distance from a center point. The distance is calculated using the Haversine formula and measured in kilometers. This is ideal for finding events near a specific location.

  • geo-bbox: Filters messages within a rectangular bounding box defined by minimum and maximum latitude/longitude coordinates. This is useful for filtering events within a specific geographic region like a city.

  • geo-polygon: Filters messages that fall within a custom polygon boundary. This allows for complex geographic areas with irregular shapes. LavinMQ uses a ray-casting algorithm to determine if coordinates fall within the polygon.

Setting up geographic filtering

Publisher side: When publishing messages to a stream, include geographic coordinates using the x-geo-lat (latitude) and x-geo-lon (longitude) headers.

Consumer side: Set x-stream-filter to one of the geographic filter types when subscribing.

Geographic filtering in action

Here are examples showing how to use each of the three geographic filter types with delivery events in Sweden.

Publishing deliveries with coordinates

import pika
import json

# Connect to LavinMQ
connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()

# Declare a stream queue
channel.queue_declare(
    queue='deliveries',
    durable=True,
    arguments={'x-queue-type': 'stream'}
)

# Publish delivery events with geographic coordinates from Swedish cities
deliveries = [
    {'id': 'D001', 'lat': 59.3293, 'lon': 18.0686, 'status': 'in_transit'},  # Stockholm
    {'id': 'D002', 'lat': 59.3598, 'lon': 18.0007, 'status': 'in_transit'},  # Solna
    {'id': 'D003', 'lat': 57.7089, 'lon': 11.9746, 'status': 'delivered'},   # Göteborg
    {'id': 'D004', 'lat': 59.8586, 'lon': 17.6389, 'status': 'in_transit'},  # Uppsala
]

for delivery in deliveries:
    headers = {
        'x-geo-lat': delivery['lat'],
        'x-geo-lon': delivery['lon']
    }

    channel.basic_publish(
        exchange='',
        routing_key='deliveries',
        body=json.dumps(delivery),
        properties=pika.BasicProperties(
            headers=headers,
        )
    )
    print(f"[✅] Published delivery {delivery['id']}")

connection.close()

Example 1: Radius filter (geo-within-radius)

Filter deliveries within 10km of central Stockholm:

import pika
import json

connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = connection.channel()

channel.queue_declare(
    queue='deliveries',
    durable=True,
    arguments={'x-queue-type': 'stream'}
)

# Subscribe with geographic radius filter
# Center: Stockholm (59.3293° N, 18.0686° E), Radius: 10km
consumer_tag = 'stockholm-dispatch'
arguments = {
  'x-stream-offset': 'first',
  'x-stream-filter': {
    'geo-within-radius': {
      'lat': 59.3293,
      'lon': 18.0686,
      'radius_km': 10
    }
  }
}

def callback(ch, method, properties, body):
    delivery = json.loads(body)
    print(f"[📦] Received delivery: {delivery['id']} at ({delivery['lat']}, {delivery['lon']})")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=100)
channel.basic_consume(
    queue='deliveries',
    on_message_callback=callback,
    auto_ack=False,
    consumer_tag=consumer_tag,
    arguments=arguments
)

print(f"[❎] Waiting for deliveries within 10km of Stockholm. To exit press CTRL+C")
channel.start_consuming()

This will receive deliveries D001 (Stockholm) and D002 (Solna), but filter out D003 (Göteborg) and D004 (Uppsala) as they fall outside the 10km radius.

Example 2: Bounding box filter (geo-bbox)

Filter deliveries within the Greater Stockholm area using a bounding box:

# Subscribe with geographic bounding box filter covering Greater Stockholm
arguments = {
  'x-stream-offset': 'first',
  'x-stream-filter': {
    'geo-bbox': {
      'min_lat': 59.2,
      'max_lat': 59.5,
      'min_lon': 17.8,
      'max_lon': 18.3
    }
  }
}

This will receive deliveries D001 (Stockholm) and D002 (Solna) as they fall within the rectangular boundary, while filtering out D003 (Göteborg) and D004 (Uppsala).

Example 3: Polygon filter (geo-polygon)

Filter deliveries within a custom polygon area (e.g., a specific district):

# Subscribe with geographic polygon filter covering central Stockholm
arguments = {
  'x-stream-offset': 'first',
  'x-stream-filter': {
    'geo-polygon': {
      'points': [
        [59.35, 18.00],
        [59.35, 18.15],
        [59.30, 18.15],
        [59.30, 18.00],
      ]
    }
  }
}

This will receive only deliveries that fall within the defined polygon boundaries. In this case, deliveries D001 (Stockholm) and D002 (Solna) would be received, while D003 (Göteborg) and D004 (Uppsala) would be filtered out.


Ready to take the next steps?

Managed LavinMQ instance via CloudAMQP

LavinMQ has been built with performance and ease of use in mind - we've benchmarked a throughput of about 1,000,000 messages/sec . You can try LavinMQ without any installation hassle by creating a free instance on CloudAMQP. Signing up is a breeze.

Get started with CloudAMQP ->

Help and feedback

We welcome your feedback and are eager to address any questions you may have about this piece or using LavinMQ. Join our Slack channel to connect with us directly. You can also find LavinMQ on GitHub.


Can’t find what you’re looking for? Let us know
Was this helpful?

Search