33
33
import java .util .List ;
34
34
import java .util .Map ;
35
35
import java .util .Objects ;
36
- import java .util .UUID ;
37
36
import java .util .concurrent .CompletableFuture ;
38
37
import java .util .concurrent .ConcurrentHashMap ;
39
38
import java .util .concurrent .ExecutionException ;
42
41
import org .apache .hadoop .hdds .HddsUtils ;
43
42
import org .apache .hadoop .hdds .conf .ConfigurationSource ;
44
43
import org .apache .hadoop .hdds .protocol .DatanodeDetails ;
44
+ import org .apache .hadoop .hdds .protocol .DatanodeID ;
45
45
import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos ;
46
46
import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .ContainerCommandRequestProto ;
47
47
import org .apache .hadoop .hdds .protocol .datanode .proto .ContainerProtos .ContainerCommandResponseProto ;
@@ -88,9 +88,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
88
88
LoggerFactory .getLogger (XceiverClientGrpc .class );
89
89
private final Pipeline pipeline ;
90
90
private final ConfigurationSource config ;
91
- private final Map <UUID , XceiverClientProtocolServiceStub > asyncStubs ;
91
+ private final Map <DatanodeID , XceiverClientProtocolServiceStub > asyncStubs ;
92
92
private final XceiverClientMetrics metrics ;
93
- private final Map <UUID , ManagedChannel > channels ;
93
+ private final Map <DatanodeID , ManagedChannel > channels ;
94
94
private final Semaphore semaphore ;
95
95
private long timeout ;
96
96
private final SecurityConfig secConfig ;
@@ -178,8 +178,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
178
178
ManagedChannel channel = createChannel (dn , port ).build ();
179
179
XceiverClientProtocolServiceStub asyncStub =
180
180
XceiverClientProtocolServiceGrpc .newStub (channel );
181
- asyncStubs .put (dn .getUuid (), asyncStub );
182
- channels .put (dn .getUuid (), channel );
181
+ asyncStubs .put (dn .getID (), asyncStub );
182
+ channels .put (dn .getID (), channel );
183
183
}
184
184
185
185
protected NettyChannelBuilder createChannel (DatanodeDetails dn , int port )
@@ -213,7 +213,7 @@ protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
213
213
*/
214
214
@ VisibleForTesting
215
215
public boolean isConnected (DatanodeDetails details ) {
216
- return isConnected (channels .get (details .getUuid ()));
216
+ return isConnected (channels .get (details .getID ()));
217
217
}
218
218
219
219
private boolean isConnected (ManagedChannel channel ) {
@@ -567,7 +567,7 @@ public XceiverClientReply sendCommandAsync(
567
567
ContainerCommandRequestProto request , DatanodeDetails dn )
568
568
throws IOException , InterruptedException {
569
569
checkOpen (dn );
570
- UUID dnId = dn .getUuid ();
570
+ DatanodeID dnId = dn .getID ();
571
571
if (LOG .isDebugEnabled ()) {
572
572
LOG .debug ("Send command {} to datanode {}" ,
573
573
request .getCmdType (), dn .getIpAddress ());
@@ -625,7 +625,7 @@ private synchronized void checkOpen(DatanodeDetails dn)
625
625
throw new IOException ("This channel is not connected." );
626
626
}
627
627
628
- ManagedChannel channel = channels .get (dn .getUuid ());
628
+ ManagedChannel channel = channels .get (dn .getID ());
629
629
// If the channel doesn't exist for this specific datanode or the channel
630
630
// is closed, just reconnect
631
631
if (!isConnected (channel )) {
@@ -639,7 +639,7 @@ private void reconnect(DatanodeDetails dn)
639
639
ManagedChannel channel ;
640
640
try {
641
641
connectToDatanode (dn );
642
- channel = channels .get (dn .getUuid ());
642
+ channel = channels .get (dn .getID ());
643
643
} catch (Exception e ) {
644
644
throw new IOException ("Error while connecting" , e );
645
645
}
0 commit comments