Skip to content

Commit 5ea0dc8

Browse files
authored
[Backport to 7.x] Remove unnecessary sleep after exhausted retries (#218)
* Remove unnecessary sleep after exhausted retries Also fixes up some issues with logging, where the batch size was calculated incorrectly.
1 parent 4b59488 commit 5ea0dc8

File tree

5 files changed

+67
-11
lines changed

5 files changed

+67
-11
lines changed

.travis.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ matrix:
77
- rvm: jruby-9.1.13.0
88
env: LOGSTASH_BRANCH=master
99
- rvm: jruby-9.1.13.0
10-
env: LOGSTASH_BRANCH=6.x
10+
env: LOGSTASH_BRANCH=6.7
1111
- rvm: jruby-9.1.13.0
12-
env: LOGSTASH_BRANCH=6.0
12+
env: LOGSTASH_BRANCH=7.0
1313
- rvm: jruby-1.7.27
1414
env: LOGSTASH_BRANCH=5.6
1515
fast_finish: true
@@ -18,3 +18,4 @@ before_script: ci/build.sh
1818
script: ci/run.sh
1919
after_script: ci/cleanup.sh
2020
jdk: oraclejdk8
21+
before_install: gem install bundler -v '< 2'

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 7.3.2
2+
- Fixed issue with unnecessary sleep after retries exhausted [#216](https://github.com/logstash-plugins/logstash-output-kafka/pull/216)
3+
14
## 7.3.1
25
- Added support for kafka property `ssl.endpoint.identification.algorithm` [#213](https://github.com/logstash-plugins/logstash-output-kafka/pull/213)
36

lib/logstash/outputs/kafka.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def multi_receive(events)
231231
end
232232

233233
def retrying_send(batch)
234-
remaining = @retries;
234+
remaining = @retries
235235

236236
while batch.any?
237237
if !remaining.nil?
@@ -281,13 +281,15 @@ def retrying_send(batch)
281281
break if failures.empty?
282282

283283
# Otherwise, retry with any failed transmissions
284-
batch = failures
285-
delay = @retry_backoff_ms / 1000.0
286-
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
287-
:failures => failures.size, :sleep => delay);
288-
sleep(delay)
284+
if remaining.nil? || remaining >= 0
285+
delay = @retry_backoff_ms / 1000.0
286+
logger.info("Sending batch to Kafka failed. Will retry after a delay.", :batch_size => batch.size,
287+
:failures => failures.size,
288+
:sleep => delay)
289+
batch = failures
290+
sleep(delay)
291+
end
289292
end
290-
291293
end
292294

293295
def close

logstash-output-kafka.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-output-kafka'
4-
s.version = '7.3.1'
4+
s.version = '7.3.2'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Writes events to a Kafka topic"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/unit/outputs/kafka_spec.rb

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
kafka.register
5050
kafka.multi_receive([event])
5151
end
52-
52+
5353
it 'should raise config error when truststore location is not set and ssl is enabled' do
5454
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("security_protocol" => "SSL"))
5555
expect { kafka.register }.to raise_error(LogStash::ConfigurationError, /ssl_truststore_location must be set when SSL is enabled/)
@@ -120,6 +120,41 @@
120120
end
121121
end
122122

123+
context 'when retries is 0' do
124+
let(:retries) { 0 }
125+
let(:max_sends) { 1 }
126+
127+
it "should should only send once" do
128+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
129+
.once
130+
.and_wrap_original do |m, *args|
131+
# Always fail.
132+
future = java.util.concurrent.FutureTask.new { raise "Failed" }
133+
future.run
134+
future
135+
end
136+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
137+
kafka.register
138+
kafka.multi_receive([event])
139+
end
140+
141+
it 'should not sleep' do
142+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
143+
.once
144+
.and_wrap_original do |m, *args|
145+
# Always fail.
146+
future = java.util.concurrent.FutureTask.new { raise "Failed" }
147+
future.run
148+
future
149+
end
150+
151+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
152+
expect(kafka).not_to receive(:sleep).with(anything)
153+
kafka.register
154+
kafka.multi_receive([event])
155+
end
156+
end
157+
123158
context "and when retries is set by the user" do
124159
let(:retries) { (rand * 10).to_i }
125160
let(:max_sends) { retries + 1 }
@@ -137,6 +172,21 @@
137172
kafka.register
138173
kafka.multi_receive([event])
139174
end
175+
176+
it 'should only sleep retries number of times' do
177+
expect_any_instance_of(org.apache.kafka.clients.producer.KafkaProducer).to receive(:send)
178+
.at_most(max_sends)
179+
.and_wrap_original do |m, *args|
180+
# Always fail.
181+
future = java.util.concurrent.FutureTask.new { raise "Failed" }
182+
future.run
183+
future
184+
end
185+
kafka = LogStash::Outputs::Kafka.new(simple_kafka_config.merge("retries" => retries))
186+
expect(kafka).to receive(:sleep).exactly(retries).times
187+
kafka.register
188+
kafka.multi_receive([event])
189+
end
140190
end
141191
end
142192
end

0 commit comments

Comments
 (0)