AMQP input doesn't seem to work with RabbitMQ federation

Description

I'm using the following simple logstash configs, firstly a producer that pushes messages into an AMQP broker:

and secondly a consumer that pulls from an AMQP broker:

I can run two copies of logstash using each config on the same host with a local RabbitMQ 2.7.1 instance and everything seems to work fine using "logger Test" to put messages into /var/log/messages.

However, I'd like to use the RabbitMQ federation plugin so that I can have the producer and consumer split across a potentially unreliable WAN link with a RabbitMQ instance at each end; producer logstash can keep reading logs and push them to the local broker where they can queue up safely while the WAN link is down, then pulled by the remote broker where again they can queue up if there's no logstash consumer running, but otherwise be processed normally.

So on another host I've created another RabbitMQ 2.7.1 instance with the following config:

On this same host I now run the consumer logstash however I cannot get it to consume and output a message. When I test with a consumer running against the same RabbitMQ instance as the producer I get the following output for each message:

But with the federated instances the consumer outputs the following:

The "received: nil" message seems to tie up with what is normally the header message. On the RabbitMQ instance with the consumer I now see two connections from logstash instead of one and the message is in an un-ACK'd state, (without "exclusive => false" in the AMQP input, the consumer starts looping indefinitely complaining about locked resources). Stopping the consumer frees up both connections and the message returns to the normal pending state in the queue. Using the RabbitMQ web interface I can peek at the message in the queue and it looks just like the payload logged in the successful debug, same length, etc. so I can't see what's different about it.

The consumer RabbitMQ instance was actually a cluster of two nodes, so I thought initially it might be the clustering aspect that was causing the problem, however I can use the cluster with a producer and consumer connected directly and there's no problems so it definitely looks somehow related to the federation aspect of the setup, but I can use the amqp-consume(1) tool from the RabbitMQ C library like so:

Any ideas?

Activity

Show:
Matt Dainty
February 4, 2012, 2:39 PM

I think I've figured out what is most likely causing this, didn't spot it initially. When a message is pulled from one broker to another with the federation plugin, it gains an additional header with details of the original broker, (I'll attach a screenshot).

My hunch is the AMQP library logstash uses is barfing on this extra header however I notice logstash connects to my RabbitMQ brokers using AMQP 0.8 rather than 0.9.1, that wouldn't make a difference would it?

Matt Dainty
February 5, 2012, 2:15 AM

Pinned this down to a bug in Bunny <= 0.7.8 using either AMQP 0.8 or 0.9.1, it didn't understand a few of the datatypes used in encoding this additional header.

I've fixed up Bunny and pushed the changes to the maintainers (https://github.com/ruby-amqp/bunny/pull/35) so when a new version appears if you could update logstash to use that it will fix this issue.

Jordan Sissel
February 5, 2012, 3:12 AM

awesome

Jordan Sissel
October 30, 2012, 4:07 AM

Pretty sure this is fixed now since we've been shipping newer versions of the 'bunny' gem with each new release of logstash.

Assignee

Jordan Sissel

Reporter

Matt Dainty

Labels

None

Fix versions

Affects versions

Configure