Skip to content

[CELEBORN-2005] Introduce numBytesIn, numBytesOut, numBytesInPerSecond, numBytesOutPerSecond metrics for RemoteShuffleServiceFactory #3272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.SupplierWithException;

Expand All @@ -45,7 +46,7 @@ public abstract class AbstractRemoteShuffleInputGate extends IndexedInputGate {

public AbstractRemoteShuffleInputGate(
CelebornConf celebornConf,
String taskName,
ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
Expand All @@ -55,7 +56,7 @@ public AbstractRemoteShuffleInputGate(
inputGateDelegation =
new RemoteShuffleInputGateDelegation(
celebornConf,
taskName,
ownerContext,
gateIndex,
gateDescriptor,
bufferPoolFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,7 +81,7 @@ public AbstractRemoteShuffleInputGateFactory(

/** Create RemoteShuffleInputGate from {@link InputGateDeploymentDescriptor}. */
public IndexedInputGate create(
String owningTaskName, int gateIndex, InputGateDeploymentDescriptor igdd) {
ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) {
LOG.info(
"Create input gate -- number of buffers per input gate={}, "
+ "number of concurrent readings={}.",
Expand All @@ -91,15 +92,15 @@ public IndexedInputGate create(
createBufferPoolFactory(networkBufferPool, numBuffersPerGate, supportFloatingBuffers);

return createInputGate(
owningTaskName,
ownerContext,
gateIndex,
igdd,
bufferPoolFactory,
celebornConf.shuffleCompressionCodec().name());
}

protected abstract IndexedInputGate createInputGate(
String owningTaskName,
ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor igdd,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -89,7 +89,7 @@ public AbstractRemoteShuffleResultPartitionFactory(
}

public ResultPartition create(
String taskNameWithSubtaskAndId,
ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionDeploymentDescriptor desc,
CelebornConf celebornConf) {
Expand All @@ -100,32 +100,32 @@ public ResultPartition create(
desc.getNumberOfSubpartitions());

return create(
taskNameWithSubtaskAndId,
ownerContext,
partitionIndex,
desc.getShuffleDescriptor().getResultPartitionID(),
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
createBufferPoolFactory(),
desc.getShuffleDescriptor(),
(RemoteShuffleDescriptor) desc.getShuffleDescriptor(),
celebornConf,
desc.getTotalNumberOfPartitions());
}

public ResultPartition create(
String taskNameWithSubtaskAndId,
ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
int numSubpartitions,
int maxParallelism,
List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
ShuffleDescriptor shuffleDescriptor,
RemoteShuffleDescriptor shuffleDescriptor,
CelebornConf celebornConf,
int numMappers) {
ResultPartition partition =
createRemoteShuffleResultPartitionInternal(
taskNameWithSubtaskAndId,
ownerContext,
partitionIndex,
id,
type,
Expand All @@ -135,13 +135,13 @@ public ResultPartition create(
celebornConf,
numMappers,
getBufferCompressor(),
(RemoteShuffleDescriptor) shuffleDescriptor);
LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
shuffleDescriptor);
LOG.debug("{}: Initialized {}", ownerContext.getOwnerName(), this);
return partition;
}

abstract ResultPartition createRemoteShuffleResultPartitionInternal(
String taskNameWithSubtaskAndId,
ShuffleIOOwnerContext ownerContext,
int partitionIndex,
ResultPartitionID id,
ResultPartitionType type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ private ResultPartitionWriter createResultPartitionWriterInternal(
if (resultPartitionDeploymentDescriptor.getShuffleDescriptor()
instanceof RemoteShuffleDescriptor) {
return resultPartitionFactory.create(
ownerContext.getOwnerName(), index, resultPartitionDeploymentDescriptor, conf);
ownerContext, index, resultPartitionDeploymentDescriptor, conf);
} else {
nettyResultIds.add(resultPartitionDeploymentDescriptor.getResultId());
nettyResultPartitionIds.add(resultPartitionDeploymentDescriptor.getPartitionId());
Expand Down Expand Up @@ -246,7 +246,7 @@ private IndexedInputGate createInputGateInternal(
? shuffleEnvironmentWrapper
.nettyInputGateFactory()
.create(ownerContext, gateIndex, igdd, producerStateProvider, inputChannelMetrics)
: inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, igdd);
: inputGateFactory.create(ownerContext, gateIndex, igdd);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.stream.IntStream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.event.AbstractEvent;
Expand All @@ -43,6 +44,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.SupplierWithException;
Expand All @@ -56,6 +58,7 @@
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.buffer.TransferBufferPool;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;

public class RemoteShuffleInputGateDelegation {
Expand Down Expand Up @@ -130,9 +133,11 @@ public class RemoteShuffleInputGateDelegation {
private int endSubIndex;
private boolean partitionConnectionExceptionEnabled;

private final MetricGroup taskIOMetricGroup;

public RemoteShuffleInputGateDelegation(
CelebornConf celebornConf,
String taskName,
ShuffleIOOwnerContext ownerContext,
int gateIndex,
InputGateDeploymentDescriptor gateDescriptor,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
Expand All @@ -141,7 +146,8 @@ public RemoteShuffleInputGateDelegation(
AvailabilityProvider.AvailabilityHelper availabilityHelper,
int startSubIndex,
int endSubIndex) {
this.taskName = taskName;
this.taskName = ownerContext.getOwnerName();
this.taskIOMetricGroup = ownerContext.getParentGroup();
this.gateIndex = gateIndex;
this.gateDescriptor = gateDescriptor;
this.bufferPoolFactory = bufferPoolFactory;
Expand Down Expand Up @@ -198,6 +204,8 @@ private void initShuffleReadClients() {
RemoteShuffleDescriptor remoteDescriptor = (RemoteShuffleDescriptor) descriptor.getRight();
ShuffleResourceDescriptor shuffleDescriptor =
remoteDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
ShuffleIOMetricGroup shuffleIOMetricGroup =
new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleDescriptor.getShuffleId());

LOG.debug("create shuffle reader for descriptor {}", shuffleDescriptor);

Expand All @@ -208,7 +216,7 @@ private void initShuffleReadClients() {
startSubIndex,
endSubIndex,
transferBufferPool,
getDataListener(descriptor.getLeft()),
getDataListener(descriptor.getLeft(), shuffleIOMetricGroup),
getFailureListener(remoteDescriptor.getResultPartitionID()));

bufferReaders.add(reader);
Expand All @@ -235,13 +243,14 @@ private List<InputChannelInfo> createChannelInfos() {
.collect(Collectors.toList());
}

private Consumer<ByteBuf> getDataListener(int channelIdx) {
private Consumer<ByteBuf> getDataListener(
int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
return byteBuf -> {
Queue<Buffer> unpackedBuffers = null;
try {
unpackedBuffers = BufferPacker.unpack(byteBuf);
while (!unpackedBuffers.isEmpty()) {
onBuffer(unpackedBuffers.poll(), channelIdx);
onBuffer(unpackedBuffers.poll(), channelIdx, shuffleIOMetricGroup);
}
} catch (Throwable throwable) {
synchronized (lock) {
Expand Down Expand Up @@ -279,7 +288,7 @@ private Consumer<Throwable> getFailureListener(ResultPartitionID rpID) {
};
}

private void onBuffer(Buffer buffer, int channelIdx) {
private void onBuffer(Buffer buffer, int channelIdx, ShuffleIOMetricGroup shuffleIOMetricGroup) {
synchronized (lock) {
if (closed || cause != null) {
buffer.recycleBuffer();
Expand All @@ -293,6 +302,7 @@ private void onBuffer(Buffer buffer, int channelIdx) {
checkState(channelInfo.getInputChannelIdx() == channelIdx, "Illegal channel index.");
LOG.debug("ReceivedBuffers is adding buffer {} on {}", buffer, channelInfo);
receivedBuffers.add(Pair.of(buffer, channelInfo));
shuffleIOMetricGroup.getNumBytesIn().inc(buffer.getSize());
needRecycle = false;
if (wasEmpty) {
availabilityHelper.getUnavailableToResetAvailable().complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Optional;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
Expand All @@ -36,6 +37,7 @@
import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
import org.apache.celeborn.plugin.flink.client.FlinkShuffleClientImpl;
import org.apache.celeborn.plugin.flink.metric.ShuffleIOMetricGroup;
import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;

Expand Down Expand Up @@ -83,6 +85,7 @@ public class RemoteShuffleOutputGate {
private boolean isRegisterShuffle = false;
private int maxReviveTimes;
private boolean hasSentHandshake = false;
protected final ShuffleIOMetricGroup shuffleIOMetricGroup;

/**
* @param shuffleDesc Describes shuffle meta and shuffle worker address.
Expand All @@ -95,8 +98,28 @@ public RemoteShuffleOutputGate(
int bufferSize,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
CelebornConf celebornConf,
int numMappers) {
int numMappers,
MetricGroup taskIOMetricGroup) {
this(
shuffleDesc,
numSubs,
bufferSize,
bufferPoolFactory,
celebornConf,
numMappers,
taskIOMetricGroup,
null);
}

public RemoteShuffleOutputGate(
RemoteShuffleDescriptor shuffleDesc,
int numSubs,
int bufferSize,
SupplierWithException<BufferPool, IOException> bufferPoolFactory,
CelebornConf celebornConf,
int numMappers,
MetricGroup taskIOMetricGroup,
FlinkShuffleClientImpl flinkShuffleClient) {
this.shuffleDesc = shuffleDesc;
this.numSubs = numSubs;
this.bufferPoolFactory = bufferPoolFactory;
Expand All @@ -116,8 +139,9 @@ public RemoteShuffleOutputGate(
this.lifecycleManagerPort = shuffleDesc.getShuffleResource().getLifecycleManagerPort();
this.lifecycleManagerTimestamp =
shuffleDesc.getShuffleResource().getLifecycleManagerTimestamp();
this.flinkShuffleClient = getShuffleClient();
this.flinkShuffleClient = flinkShuffleClient == null ? getShuffleClient() : flinkShuffleClient;
this.maxReviveTimes = celebornConf.clientPushMaxReviveTimes();
this.shuffleIOMetricGroup = new ShuffleIOMetricGroup(taskIOMetricGroup, shuffleId);
}

/** Initialize transportation gate. */
Expand Down Expand Up @@ -210,14 +234,16 @@ FlinkShuffleClientImpl getShuffleClient() {
/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, BufferHeader bufferHeader) {
try {
flinkShuffleClient.pushDataToLocation(
shuffleId,
mapId,
attemptId,
bufferHeader.getSubPartitionId(),
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
partitionLocation,
() -> byteBuf.release());
int bytesWritten =
flinkShuffleClient.pushDataToLocation(
shuffleId,
mapId,
attemptId,
bufferHeader.getSubPartitionId(),
io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
partitionLocation,
byteBuf::release);
shuffleIOMetricGroup.getNumBytesOut().inc(bytesWritten);
} catch (IOException e) {
Utils.rethrowAsRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void emit(

DataBuffer dataBuffer = isBroadcast ? getBroadcastDataBuffer() : getUnicastDataBuffer();
if (dataBuffer.append(record, targetSubpartition, dataType)) {
incNumRecordsOut(dataType);
return;
}

Expand All @@ -117,6 +118,7 @@ public void emit(
dataBuffer.finish();
dataBuffer.release();
writeLargeRecord(record, targetSubpartition, dataType, isBroadcast);
incNumRecordsOut(dataType);
return;
}
flushDataBuffer(dataBuffer, isBroadcast);
Expand All @@ -127,6 +129,12 @@ public void emit(
emit(record, targetSubpartition, dataType, isBroadcast);
}

private void incNumRecordsOut(Buffer.DataType dataType) {
if (dataType.isBuffer()) {
outputGate.shuffleIOMetricGroup.getNumRecordsOut().inc();
}
}

@VisibleForTesting
public DataBuffer getUnicastDataBuffer() throws IOException {
flushBroadcastDataBuffer();
Expand Down
Loading
Loading