Skip to content

Commit fbd0997

Browse files
author
子懿
committed
[CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond, numBytesOutPerSecond metrics for RemoteShuffleServiceFactory
1 parent cbf4a14 commit fbd0997

File tree

32 files changed

+275
-94
lines changed

32 files changed

+275
-94
lines changed

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGate.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
3434
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
3535
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
36+
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
3637
import org.apache.flink.util.FlinkRuntimeException;
3738
import org.apache.flink.util.function.SupplierWithException;
3839

@@ -45,7 +46,7 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate {
4546

4647
public AbstractRemoteShuffleInputGate(
4748
CelebornConf celebornConf,
48-
String taskName,
49+
ShuffleIOOwnerContext ownerContext,
4950
int gateIndex,
5051
InputGateDeploymentDescriptor gateDescriptor,
5152
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -55,7 +56,7 @@ public AbstractRemoteShuffleInputGate(
5556
inputGateDelegation =
5657
new RemoteShuffleInputGateDelegation(
5758
celebornConf,
58-
taskName,
59+
ownerContext,
5960
gateIndex,
6061
gateDescriptor,
6162
bufferPoolFactory,

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
2626
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
2727
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
28+
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
2829
import org.apache.flink.util.function.SupplierWithException;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@ public AbstractRemoteShuffleInputGateFactory(
8081

8182
/** Create RemoteShuffleInputGate from {@link InputGateDeploymentDescriptor}. */
8283
public IndexedInputGate create(
83-
String owningTaskName, int gateIndex, InputGateDeploymentDescriptor igdd) {
84+
ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) {
8485
LOG.info(
8586
"Create input gate -- number of buffers per input gate={}, "
8687
+ "number of concurrent readings={}.",
@@ -91,15 +92,15 @@ public IndexedInputGate create(
9192
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);
9293

9394
return createInputGate(
94-
owningTaskName,
95+
ownerContext,
9596
gateIndex,
9697
igdd,
9798
bufferPoolFactory,
9899
celebornConf.shuffleCompressionCodec().name());
99100
}
100101

101102
protected abstract IndexedInputGate createInputGate(
102-
String owningTaskName,
103+
ShuffleIOOwnerContext ownerContext,
103104
int gateIndex,
104105
InputGateDeploymentDescriptor igdd,
105106
SupplierWithException<BufferPool, IOException> bufferPoolFactory,

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleResultPartitionFactory.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
3131
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
3232
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
33-
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
33+
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
3434
import org.apache.flink.util.function.SupplierWithException;
3535
import org.slf4j.Logger;
3636
import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ public AbstractRemoteShuffleResultPartitionFactory(
8989
}
9090

9191
public ResultPartition create(
92-
String taskNameWithSubtaskAndId,
92+
ShuffleIOOwnerContext ownerContext,
9393
int partitionIndex,
9494
ResultPartitionDeploymentDescriptor desc,
9595
CelebornConf celebornConf) {
@@ -100,32 +100,32 @@ public ResultPartition create(
100100
desc.getNumberOfSubpartitions());
101101

102102
return create(
103-
taskNameWithSubtaskAndId,
103+
ownerContext,
104104
partitionIndex,
105105
desc.getShuffleDescriptor().getResultPartitionID(),
106106
desc.getPartitionType(),
107107
desc.getNumberOfSubpartitions(),
108108
desc.getMaxParallelism(),
109109
createBufferPoolFactory(),
110-
desc.getShuffleDescriptor(),
110+
(RemoteShuffleDescriptor) desc.getShuffleDescriptor(),
111111
celebornConf,
112112
desc.getTotalNumberOfPartitions());
113113
}
114114

115115
public ResultPartition create(
116-
String taskNameWithSubtaskAndId,
116+
ShuffleIOOwnerContext ownerContext,
117117
int partitionIndex,
118118
ResultPartitionID id,
119119
ResultPartitionType type,
120120
int numSubpartitions,
121121
int maxParallelism,
122122
List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
123-
ShuffleDescriptor shuffleDescriptor,
123+
RemoteShuffleDescriptor shuffleDescriptor,
124124
CelebornConf celebornConf,
125125
int numMappers) {
126126
ResultPartition partition =
127127
createRemoteShuffleResultPartitionInternal(
128-
taskNameWithSubtaskAndId,
128+
ownerContext,
129129
partitionIndex,
130130
id,
131131
type,
@@ -135,13 +135,13 @@ public ResultPartition create(
135135
celebornConf,
136136
numMappers,
137137
getBufferCompressor(),
138-
(RemoteShuffleDescriptor) shuffleDescriptor);
139-
LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
138+
shuffleDescriptor);
139+
LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this);
140140
return partition;
141141
}
142142

143143
abstract ResultPartition createRemoteShuffleResultPartitionInternal(
144-
String taskNameWithSubtaskAndId,
144+
ShuffleIOOwnerContext ownerContext,
145145
int partitionIndex,
146146
ResultPartitionID id,
147147
ResultPartitionType type,

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ private ResultPartitionWriter createResultPartitionWriterInternal(
191191
if (resultPartitionDeploymentDescriptor.getShuffleDescriptor()
192192
instanceof RemoteShuffleDescriptor) {
193193
return resultPartitionFactory.create(
194-
ownerContext.getOwnerName(), index, resultPartitionDeploymentDescriptor, conf);
194+
ownerContext, index, resultPartitionDeploymentDescriptor, conf);
195195
} else {
196196
nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId());
197197
nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId());
@@ -246,7 +246,7 @@ private IndexedInputGate createInputGateInternal(
246246
? shuffleEnvironmentWrapper
247247
.nettyInputGateFactory()
248248
.create(ownerContext, gateIndex, igdd, producerStateProvider, inputChannelMetrics)
249-
: inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, igdd);
249+
: inputGateFactory.create(ownerContext, gateIndex, igdd);
250250
}
251251

252252
@VisibleForTesting

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.stream.IntStream;
2929

3030
import org.apache.commons.lang3.tuple.Pair;
31+
import org.apache.flink.metrics.MetricGroup;
3132
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
3233
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
3334
import org.apache.flink.runtime.event.AbstractEvent;
@@ -43,6 +44,7 @@
4344
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
4445
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
4546
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
47+
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
4648
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
4749
import org.apache.flink.util.ExceptionUtils;
4850
import org.apache.flink.util.function.SupplierWithException;
@@ -56,6 +58,7 @@
5658
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
5759
import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
5860
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
61+
import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
5962
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
6063

6164
public class RemoteShuffleInputGateDelegation {
@@ -130,9 +133,11 @@ public class RemoteShuffleInputGateDelegation {
130133
private int endSubIndex;
131134
private boolean partitionConnectionExceptionEnabled;
132135

136+
private final MetricGroup taskIOMetricGroup;
137+
133138
public RemoteShuffleInputGateDelegation(
134139
CelebornConf celebornConf,
135-
String taskName,
140+
ShuffleIOOwnerContext ownerContext,
136141
int gateIndex,
137142
InputGateDeploymentDescriptor gateDescriptor,
138143
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
@@ -141,7 +146,8 @@ public RemoteShuffleInputGateDelegation(
141146
AvailabilityProvider.AvailabilityHelper availabilityHelper,
142147
int startSubIndex,
143148
int endSubIndex) {
144-
this.taskName = taskName;
149+
this.taskName = ownerContext.getOwnerName();
150+
this.taskIOMetricGroup = ownerContext.getParentGroup();
145151
this.gateIndex = gateIndex;
146152
this.gateDescriptor = gateDescriptor;
147153
this.bufferPoolFactory = bufferPoolFactory;
@@ -198,6 +204,8 @@ private void initShuffleReadClients() {
198204
RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight();
199205
ShuffleResourceDescriptor shuffleDescriptor =
200206
remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
207+
ShuffleIOMetricGroup shuffleIOMetricGroup =
208+
new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleDescriptor.getShuffleId());
201209

202210
LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);
203211

@@ -208,7 +216,7 @@ private void initShuffleReadClients() {
208216
startSubIndex,
209217
endSubIndex,
210218
transferBufferPool,
211-
getDataListener(descriptor.getLeft()),
219+
getDataListener(descriptor.getLeft(), shuffleIOMetricGroup),
212220
getFailureListener(remoteDescriptor.getResultPartitionID()));
213221

214222
bufferReaders.add(reader);
@@ -235,13 +243,14 @@ private List<InputChannelInfo> createChannelInfos() {
235243
.collect(Collectors.toList());
236244
}
237245

238-
private Consumer<ByteBuf> getDataListener(int channelIdx) {
246+
private Consumer<ByteBuf> getDataListener(
247+
int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
239248
return byteBuf -> {
240249
Queue<Buffer> unpackedBuffers = null;
241250
try {
242251
unpackedBuffers = BufferPacker.unpack(byteBuf);
243252
while (!unpackedBuffers.isEmpty()) {
244-
onBuffer(unpackedBuffers.poll(), channelIdx);
253+
onBuffer(unpackedBuffers.poll(), channelIdx, shuffleIOMetricGroup);
245254
}
246255
} catch (Throwable throwable) {
247256
synchronized (lock) {
@@ -279,7 +288,7 @@ private Consumer<Throwable> getFailureListener(ResultPartitionID rpID) {
279288
};
280289
}
281290

282-
private void onBuffer(Buffer buffer, int channelIdx) {
291+
private void onBuffer(Buffer buffer, int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
283292
synchronized (lock) {
284293
if (closed || cause != null) {
285294
buffer.recycleBuffer();
@@ -293,6 +302,7 @@ private void onBuffer(Buffer buffer, int channelIdx) {
293302
checkState(channelInfo.getInputChannelIdx() == channelIdx, "Illegal channel index.");
294303
LOG.debug("ReceivedBuffers is adding buffer {} on {}", buffer, channelInfo);
295304
receivedBuffers.add(Pair.of(buffer, channelInfo));
305+
shuffleIOMetricGroup.getNumBytesIn().inc(buffer.getSize());
296306
needRecycle = false;
297307
if (wasEmpty) {
298308
availabilityHelper.getUnavailableToResetAvailable().complete(null);

client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Optional;
2222

2323
import org.apache.flink.annotation.VisibleForTesting;
24+
import org.apache.flink.metrics.MetricGroup;
2425
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
2526
import org.apache.flink.runtime.io.network.buffer.Buffer;
2627
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -36,6 +37,7 @@
3637
import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
3738
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
3839
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
40+
import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
3941
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
4042
import org.apache.celeborn.plugin.flink.utils.Utils;
4143

@@ -83,6 +85,7 @@ public class RemoteShuffleOutputGate {
8385
private boolean isRegisterShuffle = false;
8486
private int maxReviveTimes;
8587
private boolean hasSentHandshake = false;
88+
protected final ShuffleIOMetricGroup shuffleIOMetricGroup;
8689

8790
/**
8891
* @param shuffleDesc Describes shuffle meta and shuffle worker address.
@@ -95,8 +98,28 @@ public RemoteShuffleOutputGate(
9598
int bufferSize,
9699
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
97100
CelebornConf celebornConf,
98-
int numMappers) {
101+
int numMappers,
102+
MetricGroup taskIOMetricGroup) {
103+
this(
104+
shuffleDesc,
105+
numSubs,
106+
bufferSize,
107+
bufferPoolFactory,
108+
celebornConf,
109+
numMappers,
110+
taskIOMetricGroup,
111+
null);
112+
}
99113

114+
public RemoteShuffleOutputGate(
115+
RemoteShuffleDescriptor shuffleDesc,
116+
int numSubs,
117+
int bufferSize,
118+
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
119+
CelebornConf celebornConf,
120+
int numMappers,
121+
MetricGroup taskIOMetricGroup,
122+
FlinkShuffleClientImpl flinkShuffleClient) {
100123
this.shuffleDesc = shuffleDesc;
101124
this.numSubs = numSubs;
102125
this.bufferPoolFactory = bufferPoolFactory;
@@ -116,8 +139,9 @@ public RemoteShuffleOutputGate(
116139
this.lifecycleManagerPort = shuffleDesc.getShuffleResource().getLifecycleManagerPort();
117140
this.lifecycleManagerTimestamp =
118141
shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
119-
this.flinkShuffleClient = getShuffleClient();
142+
this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() : flinkShuffleClient;
120143
this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes();
144+
this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleId);
121145
}
122146

123147
/** Initialize transportation gate. */
@@ -210,14 +234,16 @@ FlinkShuffleClientImpl getShuffleClient() {
210234
/** Writes a piece of data to a subpartition. */
211235
public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
212236
try {
213-
flinkShuffleClient.pushDataToLocation(
214-
shuffleId,
215-
mapId,
216-
attemptId,
217-
bufferHeader.getSubPartitionId(),
218-
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
219-
partitionLocation,
220-
() -> byteBuf.release());
237+
int bytesWritten =
238+
flinkShuffleClient.pushDataToLocation(
239+
shuffleId,
240+
mapId,
241+
attemptId,
242+
bufferHeader.getSubPartitionId(),
243+
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
244+
partitionLocation,
245+
byteBuf::release);
246+
shuffleIOMetricGroup.getNumBytesOut().inc(bytesWritten);
221247
} catch (IOException e) {
222248
Utils.rethrowAsRuntimeException(e);
223249
}

0 commit comments

Comments
 (0)