Skip to content

Commit 469f83a

Browse files
committed
Add support for per-message properties
Fixes #66
1 parent 9c2843d commit 469f83a

File tree

5 files changed

+39
-4
lines changed

5 files changed

+39
-4
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.0.9
2+
- Support per-message properties
3+
14
## 4.0.7
25
- Require v4.3.0 of rabbitmq-connection_mixin which bumps the major version of the java rabbitmq lib
36

docs/index.asciidoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
4646
| <<plugins-{type}s-{plugin}-heartbeat>> |<<number,number>>|No
4747
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|Yes
4848
| <<plugins-{type}s-{plugin}-key>> |<<string,string>>|No
49+
| <<plugins-{type}s-{plugin}-message_properties>> |<<Hash,Hash>>|No
4950
| <<plugins-{type}s-{plugin}-passive>> |<<boolean,boolean>>|No
5051
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
5152
| <<plugins-{type}s-{plugin}-persistent>> |<<boolean,boolean>>|No
@@ -169,6 +170,19 @@ Key to route to by default. Defaults to 'logstash'
169170

170171
* Routing keys are ignored on fanout exchanges.
171172

173+
[id="plugins-{type}s-{plugin}-message_properties"]
174+
===== `message_properties`
175+
176+
* Value type is <<hash,hash>>
177+
* Default value is `{}`
178+
179+
Add properties to be set per-message here, such as 'Content-Type', 'Priority'
180+
181+
Example:
182+
[source,ruby]
183+
message_properties => { "priority" => "1" }
184+
185+
172186
[id="plugins-{type}s-{plugin}-passive"]
173187
===== `passive`
174188

lib/logstash/outputs/rabbitmq.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,18 @@ class RabbitMQ < LogStash::Outputs::Base
3838
# Should RabbitMQ persist messages to disk?
3939
config :persistent, :validate => :boolean, :default => true
4040

41+
# Properties to be passed along with the message
42+
config :message_properties, :validate => :hash, :default => {}
43+
4144
def register
4245
connect!
4346
@hare_info.exchange = declare_exchange!(@hare_info.channel, @exchange, @exchange_type, @durable)
4447
@codec.on_event(&method(:publish))
4548
end
4649

50+
def symbolize(myhash)
51+
Hash[myhash.map{|(k,v)| [k.to_sym,v]}]
52+
end
4753

4854
def receive(event)
4955
@codec.encode(event)
@@ -53,7 +59,7 @@ def receive(event)
5359

5460
def publish(event, message)
5561
raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
56-
@hare_info.exchange.publish(message, :routing_key => event.sprintf(@key), :properties => { :persistent => @persistent })
62+
@hare_info.exchange.publish(message, :routing_key => event.sprintf(@key), :properties => symbolize(@message_properties.merge(:persistent => @persistent)))
5763
rescue MarchHare::Exception, IOError, AlreadyClosedException, TimeoutException => e
5864
@logger.error("Error while publishing. Will retry.",
5965
:message => e.message,

logstash-output-rabbitmq.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-rabbitmq'
3-
s.version = '4.0.8'
3+
s.version = '4.0.9'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Push events to a RabbitMQ exchange"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/outputs/rabbitmq_spec.rb

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,16 @@
135135
let(:klass) { LogStash::Outputs::RabbitMQ }
136136
let(:exchange) { "myexchange" }
137137
let(:exchange_type) { "topic" }
138+
let(:priority) { 34 }
138139
let(:default_plugin_config) {
139140
{
140141
"host" => "127.0.0.1",
141142
"exchange" => exchange,
142143
"exchange_type" => exchange_type,
143-
"key" => "foo"
144+
"key" => "foo",
145+
"message_properties" => {
146+
"priority" => priority
147+
}
144148
}
145149
}
146150
let(:config) { default_plugin_config }
@@ -158,7 +162,7 @@ def spawn_and_wait(instance)
158162
# Extra time to make sure the output can attach
159163
sleep 1
160164
end
161-
165+
let(:message) { LogStash::Event.new("message" => "Foo Message", "extra_field" => "Blah") }
162166
let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
163167
let(:test_channel) { test_connection.create_channel }
164168
let(:test_queue) {
@@ -190,6 +194,14 @@ def spawn_and_wait(instance)
190194
instance.close
191195
expect(instance.connected?).to be_falsey
192196
end
197+
198+
it 'applies per message settings' do
199+
instance.receive(message)
200+
sleep 1.0
201+
202+
message, payload = test_queue.pop
203+
expect(message.properties.to_s).to include("priority=#{priority}")
204+
end
193205
end
194206

195207
describe "sending a message with an exchange specified" do

0 commit comments

Comments
 (0)