Skip to content

HDDS-12969. Use DatanodeID in XceiverClientGrpc #8430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
HDDS-12969. Replace UUID with DatanodeID
  • Loading branch information
Jimmyweng006 committed May 10, 2025
commit f7e15ef651d13c8ca7ffac25a5ed962e4f2e8a92
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -42,6 +41,7 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -88,9 +88,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
LoggerFactory.getLogger(XceiverClientGrpc.class);
private final Pipeline pipeline;
private final ConfigurationSource config;
private final Map<UUID, XceiverClientProtocolServiceStub> asyncStubs;
private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
private final XceiverClientMetrics metrics;
private final Map<UUID, ManagedChannel> channels;
private final Map<DatanodeID, ManagedChannel> channels;
private final Semaphore semaphore;
private long timeout;
private final SecurityConfig secConfig;
Expand Down Expand Up @@ -178,8 +178,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
ManagedChannel channel = createChannel(dn, port).build();
XceiverClientProtocolServiceStub asyncStub =
XceiverClientProtocolServiceGrpc.newStub(channel);
asyncStubs.put(dn.getUuid(), asyncStub);
channels.put(dn.getUuid(), channel);
asyncStubs.put(dn.getID(), asyncStub);
channels.put(dn.getID(), channel);
}

protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
Expand Down Expand Up @@ -213,7 +213,7 @@ protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
*/
@VisibleForTesting
public boolean isConnected(DatanodeDetails details) {
return isConnected(channels.get(details.getUuid()));
return isConnected(channels.get(details.getID()));
}

private boolean isConnected(ManagedChannel channel) {
Expand Down Expand Up @@ -567,7 +567,7 @@ public XceiverClientReply sendCommandAsync(
ContainerCommandRequestProto request, DatanodeDetails dn)
throws IOException, InterruptedException {
checkOpen(dn);
UUID dnId = dn.getUuid();
DatanodeID dnId = dn.getID();
if (LOG.isDebugEnabled()) {
LOG.debug("Send command {} to datanode {}",
request.getCmdType(), dn.getIpAddress());
Expand Down Expand Up @@ -625,7 +625,7 @@ private synchronized void checkOpen(DatanodeDetails dn)
throw new IOException("This channel is not connected.");
}

ManagedChannel channel = channels.get(dn.getUuid());
ManagedChannel channel = channels.get(dn.getID());
// If the channel doesn't exist for this specific datanode or the channel
// is closed, just reconnect
if (!isConnected(channel)) {
Expand All @@ -639,7 +639,7 @@ private void reconnect(DatanodeDetails dn)
ManagedChannel channel;
try {
connectToDatanode(dn);
channel = channels.get(dn.getUuid());
channel = channels.get(dn.getID());
} catch (Exception e) {
throw new IOException("Error while connecting", e);
}
Expand Down