-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Update Kafka connector to 2.3.1 #13709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update Kafka connector to 2.3.1 #13709
Conversation
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaThreadIdentifier.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaThreadIdentifier.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java
Outdated
Show resolved
Hide resolved
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, @dcfocus
I add some comments
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConsumerManager.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableLayoutHandle.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplit.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java
Outdated
Show resolved
Hide resolved
presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/offset_start_ts/startOffsetTimestamp/g
generally, we do not put underscore in toString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
Hi @zhenxiao Thanks for all the comments. Very helpful!! |
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason this isn't higher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was to keep consistent with kafka_2.12
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure there's a correlation between the Scala version and Kafka version.
If Presto needs Scala 2.12, that's fine, but I mean, why not target Kafka 2.3.0, the latest version, for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Jordan for explanation. Yes, you are right. There shouldn't be correlation. I will update to latest compatible version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, shall we use kafka 2.3.1, the latest version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but I keep running into this issue after I tried with 2.0.0+
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.record.RecordFormat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be ByteBuffer if that's your deserializer type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Thanks for pointing out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you add <ByteBuffer, ByteBuffer>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are types needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Removed change 2 after discussion with @zhenxiao . For most companies, kafka topic is not that big, we may not have significant performance gain from pushdown of getOffsetByTimeStamp. |
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, @dcfocus
A few more comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/props/properties/g
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/props/properties/g ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static import these ConsumerConfig constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why set classloader to null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Was there to get ride of some exception during experiments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Thread.currentThread().getName(), instead of creating threadId
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel 1s is too long, how about 500ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use thread getName(), instead of getId()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it better return a pair of long? instead of having a long array?
Or, we could inline this functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Inlined, fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this line above the for loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need KafkaCluster interface? It only has one implementation
we might just need one class with selectRandomServer()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, shall we use kafka 2.3.1, the latest version?
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
almost there, thank you @dcfocus
please update documentation:
./presto-docs/src/main/sphinx/connector/kafka.rst
./presto-docs/src/main/sphinx/connector/kafka-tutorial.rst
And your PR title should be: "Update Kafka connector to 2.3.1"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/props/properties/g ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to move leader here, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
Hi @Cricket007 upon tests passed, am mostly good with this PR |
OneCricketeer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly satisfied ✅
Minor comments
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this version as a property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default partitioner is default. Need to specify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks for catching this one.
Thanks to both. Very helpful comments. |
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to me
a few minor issues
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka.version
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use kafka.version, no need to create a separate clients version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apache kafka 2.3.1+ is supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1024Kb * 1024 = 1MB?
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good to me
upon test passed, I will merge
zhenxiao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one more comment
pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add this dependency to presto-kafka/pom.xml?
|
@dcfocus could you please fix the test failures? |
presto-server/pom.xml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we making these changes? this is for presto-server, better not to pollute the lib
523e809 to
8f77144
Compare
8f77144 to
687dc04
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see nothing wrong with try with resources
b6198ce to
1ac3805
Compare
1ac3805 to
1dd8354
Compare
This PR improves Kafka connector implementation and performance.