diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/cli/GenericCli.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/cli/GenericCli.java
index af8413cc43a..152a59d41e2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/cli/GenericCli.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/cli/GenericCli.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.util.ExitUtils;
import picocli.CommandLine;
import picocli.CommandLine.ExitCode;
import picocli.CommandLine.Option;
@@ -78,7 +79,7 @@ public void run(String[] argv) {
int exitCode = execute(argv);
if (exitCode != ExitCode.OK) {
- System.exit(exitCode);
+ ExitUtils.terminate(exitCode, null, null);
}
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 941df45c2df..c2f79a78657 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -632,7 +632,9 @@ public final class ScmConfigKeys {
public static final String NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY =
"net.topology.node.switch.mapping.impl";
-
+ public static final String HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL
+ = "hdds.container.ratis.statemachine.write.wait.interval";
+ public static final long HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT = 10 * 60 * 1000_000_000L;
/**
* Never constructed.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
index c7985cf47df..23c8f3bb8a2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
@@ -18,13 +18,15 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.builder.CompareToBuilder;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
+import jakarta.annotation.Nonnull;
+import java.util.Objects;
+import java.util.function.Supplier;
+import net.jcip.annotations.Immutable;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.LongCodec;
+import org.apache.ratis.util.MemoizedSupplier;
/**
* Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
@@ -34,6 +36,7 @@
*
* This class is immutable.
*/
+@Immutable
public final class ContainerID implements Comparable {
private static final Codec CODEC = new DelegatedCodec<>(
LongCodec.get(), ContainerID::valueOf, c -> c.id,
@@ -46,16 +49,20 @@ public static Codec getCodec() {
}
private final long id;
+ private final Supplier proto;
+ private final Supplier hash;
/**
* Constructs ContainerID.
*
* @param id int
*/
- public ContainerID(long id) {
+ private ContainerID(long id) {
Preconditions.checkState(id >= 0,
"Container ID should be positive. %s.", id);
this.id = id;
+ this.proto = MemoizedSupplier.valueOf(() -> HddsProtos.ContainerID.newBuilder().setId(id).build());
+ this.hash = MemoizedSupplier.valueOf(() -> 61 * 71 + Long.hashCode(id));
}
/**
@@ -80,16 +87,12 @@ public long getId() {
return id;
}
- /**
- * Use proto message.
- */
- @Deprecated
- public byte[] getBytes() {
+ public static byte[] getBytes(long id) {
return LongCodec.get().toPersistedFormat(id);
}
public HddsProtos.ContainerID getProtobuf() {
- return HddsProtos.ContainerID.newBuilder().setId(id).build();
+ return proto.get();
}
public static ContainerID getFromProtobuf(HddsProtos.ContainerID proto) {
@@ -107,25 +110,18 @@ public boolean equals(final Object o) {
}
final ContainerID that = (ContainerID) o;
-
- return new EqualsBuilder()
- .append(id, that.id)
- .isEquals();
+ return this.id == that.id;
}
@Override
public int hashCode() {
- return new HashCodeBuilder(61, 71)
- .append(id)
- .toHashCode();
+ return hash.get();
}
@Override
- public int compareTo(final ContainerID that) {
- Preconditions.checkNotNull(that);
- return new CompareToBuilder()
- .append(this.id, that.id)
- .build();
+ public int compareTo(@Nonnull final ContainerID that) {
+ Objects.requireNonNull(that, "that == null");
+ return Long.compare(this.id, that.id);
}
@Override
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
index 51c1670d195..326b16a7b6f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
@@ -124,7 +124,7 @@ public String complete(String path) {
int i, j;
for (i = 1, j = 1; i < subPath.length && j < (allSchema.size() - 1);) {
if (allSchema.get(j).matchPrefix(subPath[i])) {
- newPath.append(NetConstants.PATH_SEPARATOR_STR + subPath[i]);
+ newPath.append(NetConstants.PATH_SEPARATOR_STR).append(subPath[i]);
i++;
j++;
} else {
@@ -133,7 +133,7 @@ public String complete(String path) {
}
}
if (i == (subPath.length - 1)) {
- newPath.append(NetConstants.PATH_SEPARATOR_STR + subPath[i]);
+ newPath.append(NetConstants.PATH_SEPARATOR_STR).append(subPath[i]);
return newPath.toString();
}
return null;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index f10a7296fb8..997b5c576b7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -522,19 +522,20 @@ public int hashCode() {
@Override
public String toString() {
final StringBuilder b =
- new StringBuilder(getClass().getSimpleName()).append("[");
+ new StringBuilder(getClass().getSimpleName()).append("{");
b.append(" Id: ").append(id.getId());
- b.append(", Nodes: ");
+ b.append(", Nodes: [");
for (DatanodeDetails datanodeDetails : nodeStatus.keySet()) {
- b.append(datanodeDetails);
- b.append(" ReplicaIndex: ").append(this.getReplicaIndex(datanodeDetails));
+ b.append(" {").append(datanodeDetails);
+ b.append(", ReplicaIndex: ").append(this.getReplicaIndex(datanodeDetails)).append("},");
}
+ b.append("]");
b.append(", ReplicationConfig: ").append(replicationConfig);
b.append(", State:").append(getPipelineState());
b.append(", leaderId:").append(leaderId != null ? leaderId.toString() : "");
b.append(", CreationTimestamp").append(getCreationTimestamp()
.atZone(ZoneId.systemDefault()));
- b.append("]");
+ b.append("}");
return b.toString();
}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalization.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalization.java
index 131469e4341..0afcc2c7b23 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalization.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/upgrade/UpgradeFinalization.java
@@ -17,6 +17,9 @@
package org.apache.hadoop.ozone.upgrade;
+import static org.apache.hadoop.ozone.upgrade.UpgradeException.ResultCodes.INVALID_REQUEST;
+
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -110,6 +113,60 @@ public Collection msgs() {
}
}
+ public static void handleInvalidRequestAfterInitiatingFinalization(
+ boolean force, UpgradeException e) throws IOException {
+ if (INVALID_REQUEST.equals(e.getResult())) {
+ if (force) {
+ return;
+ }
+ System.err.println("Finalization is already in progress, it is not"
+ + "possible to initiate it again.");
+ e.printStackTrace(System.err);
+ System.err.println("If you want to track progress from a new client"
+ + "for any reason, use --takeover, and the status update will be"
+ + "received by the new client. Note that with forcing to monitor"
+ + "progress from a new client, the old one initiated the upgrade"
+ + "will not be able to monitor the progress further and exit.");
+ throw new IOException("Exiting...");
+ } else {
+ throw e;
+ }
+ }
+
+ public static void emitExitMsg() {
+ System.out.println("Exiting...");
+ }
+
+ public static boolean isFinalized(Status status) {
+ return Status.ALREADY_FINALIZED.equals(status);
+ }
+
+ public static boolean isDone(Status status) {
+ return Status.FINALIZATION_DONE.equals(status);
+ }
+
+ public static boolean isInprogress(Status status) {
+ return Status.FINALIZATION_IN_PROGRESS.equals(status);
+ }
+
+ public static boolean isStarting(Status status) {
+ return Status.STARTING_FINALIZATION.equals(status);
+ }
+
+ public static void emitGeneralErrorMsg() {
+ System.err.println("Finalization was not successful.");
+ }
+
+ public static void emitFinishedMsg(String component) {
+ System.out.println("Finalization of " + component + "'s metadata upgrade "
+ + "finished.");
+ }
+
+ public static void emitCancellationMsg(String component) {
+ System.err.println("Finalization command was cancelled. Note that, this"
+ + "will not cancel finalization in " + component + ". Progress can be"
+ + "monitored in the Ozone Manager's log.");
+ }
private UpgradeFinalization() {
// no instances
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f20d606d436..6bf3ca9255e 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -650,18 +650,6 @@
allow group public LIST access.
-
- ozone.om.user.max.volume
- 1024
- OM, MANAGEMENT
-
- The maximum number of volumes a user can have on a cluster.Increasing or
- decreasing this number has no real impact on ozone cluster. This is
- defined only for operational purposes. Only an administrator can create a
- volume, once a volume is created there are no restrictions on the number
- of buckets or keys inside each bucket a user can create.
-
-
ozone.om.db.dirs
@@ -3548,6 +3536,14 @@
Timeout for the request submitted directly to Ratis in datanode.
+
+ hdds.container.ratis.statemachine.write.wait.interval
+ OZONE, DATANODE
+ 10m
+
+ Timeout for the write path for container blocks.
+
+
hdds.datanode.slow.op.warning.threshold
OZONE, DATANODE, PERFORMANCE
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
index 3f2f7f2c09a..f38eceb52ad 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
@@ -28,6 +28,8 @@
import java.time.Duration;
import java.time.Instant;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -40,6 +42,25 @@
*/
public class TestContainerInfo {
+ static int oldHash(long id) {
+ return new HashCodeBuilder(61, 71)
+ .append(id)
+ .toHashCode();
+ }
+
+ static void assertHash(long value) {
+ final ContainerID id = ContainerID.valueOf(value);
+ assertEquals(oldHash(value), id.hashCode(), id::toString);
+ }
+
+ @Test
+ void testContainIdHash() {
+ for (int i = 0; i < 100; i++) {
+ assertHash(i);
+ final long id = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+ assertHash(id);
+ }
+ }
@Test
void getProtobufRatis() {
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
index a4ce5b24943..ee0f920d8af 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java
@@ -80,13 +80,13 @@ void testJsonOutput() throws IOException {
report.incrementAndSample(
ReplicationManagerReport.HealthState.UNDER_REPLICATED,
- new ContainerID(1));
+ ContainerID.valueOf(1));
report.incrementAndSample(
ReplicationManagerReport.HealthState.UNDER_REPLICATED,
- new ContainerID(2));
+ ContainerID.valueOf(2));
report.incrementAndSample(
ReplicationManagerReport.HealthState.OVER_REPLICATED,
- new ContainerID(3));
+ ContainerID.valueOf(3));
report.setComplete();
String jsonString = JsonUtils.toJsonStringWithDefaultPrettyPrinter(report);
@@ -124,13 +124,13 @@ void testJsonOutput() throws IOException {
void testContainerIDsCanBeSampled() {
report.incrementAndSample(
ReplicationManagerReport.HealthState.UNDER_REPLICATED,
- new ContainerID(1));
+ ContainerID.valueOf(1));
report.incrementAndSample(
ReplicationManagerReport.HealthState.UNDER_REPLICATED,
- new ContainerID(2));
+ ContainerID.valueOf(2));
report.incrementAndSample(
ReplicationManagerReport.HealthState.OVER_REPLICATED,
- new ContainerID(3));
+ ContainerID.valueOf(3));
assertEquals(2,
report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -141,13 +141,13 @@ void testContainerIDsCanBeSampled() {
List sample =
report.getSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED);
- assertEquals(new ContainerID(1), sample.get(0));
- assertEquals(new ContainerID(2), sample.get(1));
+ assertEquals(ContainerID.valueOf(1), sample.get(0));
+ assertEquals(ContainerID.valueOf(2), sample.get(1));
assertEquals(2, sample.size());
sample =
report.getSample(ReplicationManagerReport.HealthState.OVER_REPLICATED);
- assertEquals(new ContainerID(3), sample.get(0));
+ assertEquals(ContainerID.valueOf(3), sample.get(0));
assertEquals(1, sample.size());
sample =
@@ -160,13 +160,13 @@ void testSamplesAreLimited() {
for (int i = 0; i < ReplicationManagerReport.SAMPLE_LIMIT * 2; i++) {
report.incrementAndSample(
ReplicationManagerReport.HealthState.UNDER_REPLICATED,
- new ContainerID(i));
+ ContainerID.valueOf(i));
}
List sample =
report.getSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED);
assertEquals(ReplicationManagerReport.SAMPLE_LIMIT, sample.size());
for (int i = 0; i < ReplicationManagerReport.SAMPLE_LIMIT; i++) {
- assertEquals(new ContainerID(i), sample.get(i));
+ assertEquals(ContainerID.valueOf(i), sample.get(i));
}
}
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
index 74347acefa4..a213482a5c8 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
@@ -98,7 +98,7 @@ default String getTrimmed(String key, String defaultValue) {
default String[] getTrimmedStrings(String name) {
String valueString = get(name);
- if (null == valueString || valueString.trim().isEmpty()) {
+ if (null == valueString) {
return EMPTY_STRING_ARRAY;
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
index 7ff80c7a732..03f0fec1835 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockDeletingServiceMetrics.java
@@ -205,34 +205,23 @@ public long getProcessedTransactionFailCount() {
@Override
public String toString() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("successCount = " + successCount.value()).append("\t")
- .append("successBytes = " + successBytes.value()).append("\t")
- .append("failureCount = " + failureCount.value()).append("\t")
- .append("outOfOrderDeleteBlockTransactionCount = "
- + outOfOrderDeleteBlockTransactionCount.value()).append("\t")
- .append("totalPendingBlockCount = "
- + totalPendingBlockCount.value()).append("\t")
- .append("totalBlockChosenCount = "
- + totalBlockChosenCount.value()).append("\t")
- .append("totalContainerChosenCount = "
- + totalContainerChosenCount.value()).append("\t")
- .append("receivedTransactionCount = "
- + receivedTransactionCount.value()).append("\t")
- .append("receivedRetryTransactionCount = "
- + receivedRetryTransactionCount.value()).append("\t")
- .append("processedTransactionSuccessCount = "
- + processedTransactionSuccessCount.value()).append("\t")
- .append("processedTransactionFailCount = "
- + processedTransactionFailCount.value()).append("\t")
- .append("receivedContainerCount = "
- + receivedContainerCount.value()).append("\t")
- .append("receivedBlockCount = "
- + receivedBlockCount.value()).append("\t")
- .append("markedBlockCount = "
- + markedBlockCount.value()).append("\t")
- .append("totalLockTimeoutTransactionCount = "
- + totalLockTimeoutTransactionCount.value()).append("\t");
+ StringBuilder buffer = new StringBuilder()
+ .append("successCount = ").append(successCount.value()).append("\t")
+ .append("successBytes = ").append(successBytes.value()).append("\t")
+ .append("failureCount = ").append(failureCount.value()).append("\t")
+ .append("outOfOrderDeleteBlockTransactionCount = ")
+ .append(outOfOrderDeleteBlockTransactionCount.value()).append("\t")
+ .append("totalPendingBlockCount = ").append(totalPendingBlockCount.value()).append("\t")
+ .append("totalBlockChosenCount = ").append(totalBlockChosenCount.value()).append("\t")
+ .append("totalContainerChosenCount = ").append(totalContainerChosenCount.value()).append("\t")
+ .append("receivedTransactionCount = ").append(receivedTransactionCount.value()).append("\t")
+ .append("receivedRetryTransactionCount = ").append(receivedRetryTransactionCount.value()).append("\t")
+ .append("processedTransactionSuccessCount = ").append(processedTransactionSuccessCount.value()).append("\t")
+ .append("processedTransactionFailCount = ").append(processedTransactionFailCount.value()).append("\t")
+ .append("receivedContainerCount = ").append(receivedContainerCount.value()).append("\t")
+ .append("receivedBlockCount = ").append(receivedBlockCount.value()).append("\t")
+ .append("markedBlockCount = ").append(markedBlockCount.value()).append("\t")
+ .append("totalLockTimeoutTransactionCount = ").append(totalLockTimeoutTransactionCount.value()).append("\t");
return buffer.toString();
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index bc995854a8f..91cfaa5a21a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -102,7 +102,7 @@ public class StateContext {
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
- private final Queue commandQueue;
+ private final Queue> commandQueue;
private final Map cmdStatusMap;
private final Lock lock;
private final DatanodeStateMachine parentDatanodeStateMachine;
@@ -738,7 +738,7 @@ public OptionalLong getTermOfLeaderSCM() {
*
* @return SCMCommand or Null.
*/
- public SCMCommand getNextCommand() {
+ public SCMCommand> getNextCommand() {
lock.lock();
try {
initTermOfLeaderSCM();
@@ -772,7 +772,7 @@ public SCMCommand getNextCommand() {
*
* @param command - SCMCommand.
*/
- public void addCommand(SCMCommand command) {
+ public void addCommand(SCMCommand> command) {
lock.lock();
try {
if (commandQueue.size() >= maxCommandQueueLimit) {
@@ -792,7 +792,7 @@ public Map getCommandQueueSummary() {
Map summary = new HashMap<>();
lock.lock();
try {
- for (SCMCommand cmd : commandQueue) {
+ for (SCMCommand> cmd : commandQueue) {
summary.put(cmd.getType(), summary.getOrDefault(cmd.getType(), 0) + 1);
}
} finally {
@@ -832,7 +832,7 @@ public void addCmdStatus(Long key, CommandStatus status) {
*
* @param cmd - {@link SCMCommand}.
*/
- public void addCmdStatus(SCMCommand cmd) {
+ public void addCmdStatus(SCMCommand> cmd) {
if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
addCmdStatus(cmd.getId(),
DeleteBlockCommandStatusBuilder.newBuilder()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 700303ee0c9..1a1594cf8a9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -84,7 +84,7 @@ public CloseContainerCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 8fcd192fe53..5cbe4726897 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -93,7 +93,7 @@ public ClosePipelineCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 69a40e1f1ad..696b04defe3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -85,7 +85,7 @@ public CommandHandler getDeleteBlocksCommandHandler() {
*
* @param command - SCM Command.
*/
- public void handle(SCMCommand command) {
+ public void handle(SCMCommand> command) {
Preconditions.checkNotNull(command);
CommandHandler handler = handlerMap.get(command.getType());
if (handler != null) {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 68ab8087d6b..d516977838e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -38,7 +38,7 @@ public interface CommandHandler {
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
- void handle(SCMCommand command, OzoneContainer container,
+ void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager);
/**
@@ -68,7 +68,7 @@ void handle(SCMCommand command, OzoneContainer container,
/**
* Default implementation for updating command status.
*/
- default void updateCommandStatus(StateContext context, SCMCommand command,
+ default void updateCommandStatus(StateContext context, SCMCommand> command,
Consumer cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.warn("{} with Id:{} not found.", command.getType(),
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index d86c0287516..30ffe7ed415 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -90,7 +90,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 80c078c5087..71277c06377 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -123,7 +123,7 @@ public DeleteBlocksCommandHandler(OzoneContainer container,
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 1d23da794a1..ae036a1c8f8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -76,7 +76,7 @@ protected DeleteContainerCommandHandler(Clock clock,
this.opsLatencyMs = registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms");
}
@Override
- public void handle(final SCMCommand command,
+ public void handle(final SCMCommand> command,
final OzoneContainer ozoneContainer,
final StateContext context,
final SCMConnectionManager connectionManager) {
@@ -93,7 +93,7 @@ public void handle(final SCMCommand command,
}
}
- private void handleInternal(SCMCommand command, StateContext context,
+ private void handleInternal(SCMCommand> command, StateContext context,
DeleteContainerCommand deleteContainerCommand,
ContainerController controller) {
final long startTime = Time.monotonicNow();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
index 6e1c566343d..a27b94b76a3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
@@ -63,7 +63,7 @@ public FinalizeNewLayoutVersionCommandHandler() {
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
invocationCount.incrementAndGet();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index 4366a912188..b2159aa44f7 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -47,7 +47,7 @@ public ReconstructECContainersCommandHandler(ConfigurationSource conf,
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
ReconstructECContainersCommand ecContainersCommand =
(ReconstructECContainersCommand) command;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
index bc8b69a50ae..f26329792b0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
@@ -48,7 +48,7 @@ public RefreshVolumeUsageCommandHandler() {
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("receive command to refresh usage info of all volumes");
invocationCount.incrementAndGet();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index d52c51e298e..17bb10fc7ea 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -65,7 +65,7 @@ public String getMetricsName() {
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
final ReplicateContainerCommand replicateCommand =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
index 548a5491743..25a158bb45d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -76,7 +76,7 @@ public SetNodeOperationalStateCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 0a1df1088d6..2a9fe61d17f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -35,17 +35,22 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.hadoop.hdds.HddsUtils;
@@ -187,13 +192,38 @@ long getStartTime() {
}
}
+ static class WriteFutures {
+ private final Future writeChunkFuture;
+ private final CompletableFuture raftFuture;
+ private final long startTime;
+
+ WriteFutures(Future writeChunkFuture,
+ CompletableFuture raftFuture, long startTime) {
+ this.writeChunkFuture = writeChunkFuture;
+ this.raftFuture = raftFuture;
+ this.startTime = startTime;
+ }
+
+ public Future getWriteChunkFuture() {
+ return writeChunkFuture;
+ }
+
+ public CompletableFuture getRaftFuture() {
+ return raftFuture;
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+ }
+
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
- private final ConcurrentHashMap> writeChunkFutureMap;
+ private final NavigableMap writeChunkFutureMap;
+ private final long writeChunkWaitMaxNs;
// keeps track of the containers created per pipeline
private final Map container2BCSIDMap;
@@ -229,7 +259,7 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.containerController = containerController;
this.ratisServer = ratisServer;
metrics = CSMMetrics.create(gid);
- this.writeChunkFutureMap = new ConcurrentHashMap<>();
+ this.writeChunkFutureMap = new ConcurrentSkipListMap<>();
applyTransactionCompletionMap = new ConcurrentHashMap<>();
this.unhealthyContainers = ConcurrentHashMap.newKeySet();
long pendingRequestsBytesLimit = (long)conf.getStorageSize(
@@ -273,6 +303,8 @@ public ContainerStateMachine(HddsDatanodeService hddsDatanodeService, RaftGroupI
this.waitOnBothFollowers = conf.getObject(
DatanodeConfiguration.class).waitOnAllFollowers();
+ this.writeChunkWaitMaxNs = conf.getTimeDuration(ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL,
+ ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_WRITE_WAIT_INTERVAL_NS_DEFAULT, TimeUnit.NANOSECONDS);
}
private void validatePeers() throws IOException {
@@ -542,6 +574,16 @@ private ContainerCommandResponseProto dispatchCommand(
private CompletableFuture writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
+ final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
+ if (previous != null) {
+ // generally state machine will wait forever, for precaution, a check is added if retry happens.
+ return previous.getRaftFuture();
+ }
+ try {
+ validateLongRunningWrite();
+ } catch (StorageContainerException e) {
+ return completeExceptionally(e);
+ }
final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
@@ -564,19 +606,22 @@ private CompletableFuture writeStateMachineData(
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
CompletableFuture raftFuture = new CompletableFuture<>();
- // ensure the write chunk happens asynchronously in writeChunkExecutor pool
- // thread.
- CompletableFuture writeChunkFuture =
- CompletableFuture.supplyAsync(() -> {
+ // ensure the write chunk happens asynchronously in writeChunkExecutor pool thread.
+ Future future = getChunkExecutor(
+ requestProto.getWriteChunk()).submit(() -> {
try {
try {
checkContainerHealthy(write.getBlockID().getContainerID(), true);
} catch (StorageContainerException e) {
- return ContainerUtils.logAndReturnError(LOG, e, requestProto);
+ ContainerCommandResponseProto result = ContainerUtils.logAndReturnError(LOG, e, requestProto);
+ handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
+ return result;
}
metrics.recordWriteStateMachineQueueingLatencyNs(
Time.monotonicNowNanos() - startTime);
- return dispatchCommand(requestProto, context);
+ ContainerCommandResponseProto result = dispatchCommand(requestProto, context);
+ handleCommandResult(requestProto, entryIndex, startTime, result, write, raftFuture);
+ return result;
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
@@ -588,55 +633,87 @@ private CompletableFuture writeStateMachineData(
stateMachineHealthy.set(false);
raftFuture.completeExceptionally(e);
throw e;
+ } finally {
+ // Remove the future once it finishes execution from the
+ writeChunkFutureMap.remove(entryIndex);
}
- }, getChunkExecutor(requestProto.getWriteChunk()));
+ });
- writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+ writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture, startTime));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
"{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
- // Remove the future once it finishes execution from the
- // writeChunkFutureMap.
- writeChunkFuture.thenApply(r -> {
- if (r.getResult() != ContainerProtos.Result.SUCCESS
- && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
- && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
- // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
- // that should not crash the pipeline.
- && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
- StorageContainerException sce =
- new StorageContainerException(r.getMessage(), r.getResult());
- LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
+ return raftFuture;
+ }
+
+ private void handleCommandResult(ContainerCommandRequestProto requestProto, long entryIndex, long startTime,
+ ContainerCommandResponseProto r, WriteChunkRequestProto write,
+ CompletableFuture raftFuture) {
+ if (r.getResult() != ContainerProtos.Result.SUCCESS
+ && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
+ && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+ // After concurrent flushes are allowed on the same key, chunk file inconsistencies can happen and
+ // that should not crash the pipeline.
+ && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
+ StorageContainerException sce =
+ new StorageContainerException(r.getMessage(), r.getResult());
+ LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: blockId" +
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
+ write.getChunkData().getChunkName() + " Error message: " +
+ r.getMessage() + " Container Result: " + r.getResult());
+ metrics.incNumWriteDataFails();
+ // If the write fails currently we mark the stateMachine as unhealthy.
+ // This leads to pipeline close. Any change in that behavior requires
+ // handling the entry for the write chunk in cache.
+ stateMachineHealthy.set(false);
+ unhealthyContainers.add(write.getBlockID().getContainerID());
+ raftFuture.completeExceptionally(sce);
+ } else {
+ metrics.incNumBytesWrittenCount(
+ requestProto.getWriteChunk().getChunkData().getLen());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getGroupId() +
+ ": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName() + " Error message: " +
- r.getMessage() + " Container Result: " + r.getResult());
- metrics.incNumWriteDataFails();
- // If the write fails currently we mark the stateMachine as unhealthy.
- // This leads to pipeline close. Any change in that behavior requires
- // handling the entry for the write chunk in cache.
- stateMachineHealthy.set(false);
- unhealthyContainers.add(write.getBlockID().getContainerID());
- raftFuture.completeExceptionally(sce);
- } else {
- metrics.incNumBytesWrittenCount(
- requestProto.getWriteChunk().getChunkData().getLen());
- if (LOG.isDebugEnabled()) {
- LOG.debug(getGroupId() +
- ": writeChunk writeStateMachineData completed: blockId" +
- write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
- write.getChunkData().getChunkName());
- }
- raftFuture.complete(r::toByteString);
- metrics.recordWriteStateMachineCompletionNs(
- Time.monotonicNowNanos() - startTime);
+ write.getChunkData().getChunkName());
}
+ raftFuture.complete(r::toByteString);
+ metrics.recordWriteStateMachineCompletionNs(
+ Time.monotonicNowNanos() - startTime);
+ }
+ }
- writeChunkFutureMap.remove(entryIndex);
- return r;
- });
- return raftFuture;
+ private void validateLongRunningWrite() throws StorageContainerException {
+ // get min valid write chunk operation's future context
+ Map.Entry writeFutureContextEntry = null;
+ for (boolean found = false; !found;) {
+ writeFutureContextEntry = writeChunkFutureMap.firstEntry();
+ if (null == writeFutureContextEntry) {
+ return;
+ }
+ if (writeFutureContextEntry.getValue().getWriteChunkFuture().isDone()) {
+ // there is a possibility that writeChunkFutureMap may have dangling entry, as remove is done before add future
+ writeChunkFutureMap.remove(writeFutureContextEntry.getKey());
+ } else {
+ found = true;
+ }
+ }
+ // validate for timeout in milli second
+ long waitTime = Time.monotonicNowNanos() - writeFutureContextEntry.getValue().getStartTime();
+ if (waitTime > writeChunkWaitMaxNs) {
+ LOG.error("Write chunk has taken {}ns crossing threshold {}ns for index {} groupId {}, " +
+ "cancelling pending write chunk for this group", waitTime, writeChunkWaitMaxNs,
+ writeFutureContextEntry.getKey(), getGroupId());
+ stateMachineHealthy.set(false);
+ writeChunkFutureMap.forEach((key, value) -> {
+ value.getWriteChunkFuture().cancel(true);
+ });
+ throw new StorageContainerException("Write chunk has taken " + waitTime + "ns crossing threshold "
+ + writeChunkWaitMaxNs + "ns for index " + writeFutureContextEntry.getKey() + " groupId " + getGroupId(),
+ ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+ }
}
private StateMachine.DataChannel getStreamDataChannel(
@@ -819,9 +896,13 @@ private ByteString readStateMachineData(
*/
@Override
public CompletableFuture flush(long index) {
- return CompletableFuture.allOf(
- writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
- .map(Map.Entry::getValue).toArray(CompletableFuture[]::new));
+ final SortedMap head = writeChunkFutureMap.headMap(index, true);
+ if (head.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.allOf(head.values().stream()
+ .map(WriteFutures::getRaftFuture)
+ .toArray(CompletableFuture[]::new));
}
/**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 057d96204a8..4694850b936 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -493,7 +493,7 @@ private SortedMap getBlockDataMap(long containerID,
SortedMap resultMap = new TreeMap<>();
Token containerToken =
- tokenHelper.getContainerToken(new ContainerID(containerID));
+ tokenHelper.getContainerToken(ContainerID.valueOf(containerID));
Iterator> iterator =
sourceNodeMap.entrySet().iterator();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
index 5d3c001eaf7..415a5fa58c9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/TarContainerPacker.java
@@ -305,7 +305,7 @@ private byte[] innerUnpack(InputStream input, Path dbRoot, Path chunksRoot)
descriptorFileContent = readEntry(archiveInput, size);
} else {
throw new IllegalArgumentException(
- "Unknown entry in the tar file: " + "" + name);
+ "Unknown entry in the tar file: " + name);
}
entry = archiveInput.getNextEntry();
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 2d0ed82d902..ffe5a40fb49 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -69,7 +69,7 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
* @param datanodeDetails - Datanode ID.
* @return Commands to be sent to the datanode.
*/
- default List processHeartbeat(DatanodeDetails datanodeDetails) {
+ default List> processHeartbeat(DatanodeDetails datanodeDetails) {
return processHeartbeat(datanodeDetails, null);
};
@@ -80,7 +80,7 @@ default List processHeartbeat(DatanodeDetails datanodeDetails) {
* heartbeating datanode.
* @return Commands to be sent to the datanode.
*/
- List processHeartbeat(DatanodeDetails datanodeDetails,
+ List> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto queueReport);
/**
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
index 02e3bd3547a..ca7427db822 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/helpers/TestContainerUtils.java
@@ -95,7 +95,7 @@ public void testDatanodeIDPersistent(@TempDir File tempDir) throws Exception {
assertWriteRead(tempDir, id1);
// Add certificate serial id.
- id1.setCertSerialId("" + RandomUtils.nextLong());
+ id1.setCertSerialId(String.valueOf(RandomUtils.secure().randomLong()));
assertWriteRead(tempDir, id1);
// Read should return an empty value if file doesn't exist
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
index db64c2c16bc..22a335e1594 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestContainerStateMachine.java
@@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@@ -30,8 +31,10 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -57,6 +60,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -246,4 +250,54 @@ public void testApplyTransactionFailure(boolean failWithException) throws Execut
ContainerProtos.ContainerCommandResponseProto.parseFrom(succcesfulTransaction.getContent());
assertEquals(ContainerProtos.Result.SUCCESS, resp.getResult());
}
+
+ @Test
+ public void testWriteTimout() throws Exception {
+ RaftProtos.LogEntryProto entry = mock(RaftProtos.LogEntryProto.class);
+ when(entry.getTerm()).thenReturn(1L);
+ when(entry.getIndex()).thenReturn(1L);
+ RaftProtos.LogEntryProto entryNext = mock(RaftProtos.LogEntryProto.class);
+ when(entryNext.getTerm()).thenReturn(1L);
+ when(entryNext.getIndex()).thenReturn(2L);
+ TransactionContext trx = mock(TransactionContext.class);
+ ContainerStateMachine.Context context = mock(ContainerStateMachine.Context.class);
+ when(trx.getStateMachineContext()).thenReturn(context);
+ doAnswer(e -> {
+ try {
+ Thread.sleep(200000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw ie;
+ }
+ return null;
+ }).when(dispatcher).dispatch(any(), any());
+
+ when(context.getRequestProto()).thenReturn(ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.WriteChunk).setWriteChunk(
+ ContainerProtos.WriteChunkRequestProto.newBuilder().setData(ByteString.copyFromUtf8("Test Data"))
+ .setBlockID(
+ ContainerProtos.DatanodeBlockID.newBuilder().setContainerID(1).setLocalID(1).build()).build())
+ .setContainerID(1)
+ .setDatanodeUuid(UUID.randomUUID().toString()).build());
+ AtomicReference throwable = new AtomicReference<>(null);
+ Function throwableSetter = t -> {
+ throwable.set(t);
+ return null;
+ };
+ Field writeChunkWaitMaxNs = stateMachine.getClass().getDeclaredField("writeChunkWaitMaxNs");
+ writeChunkWaitMaxNs.setAccessible(true);
+ writeChunkWaitMaxNs.set(stateMachine, 1000_000_000);
+ CompletableFuture firstWrite = stateMachine.write(entry, trx);
+ Thread.sleep(2000);
+ CompletableFuture secondWrite = stateMachine.write(entryNext, trx);
+ firstWrite.exceptionally(throwableSetter).get();
+ assertNotNull(throwable.get());
+ assertInstanceOf(InterruptedException.class, throwable.get());
+
+ secondWrite.exceptionally(throwableSetter).get();
+ assertNotNull(throwable.get());
+ assertInstanceOf(StorageContainerException.class, throwable.get());
+ StorageContainerException sce = (StorageContainerException) throwable.get();
+ assertEquals(ContainerProtos.Result.CONTAINER_INTERNAL_ERROR, sce.getResult());
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
index c17ef1bfb62..e98be4881a5 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
@@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.JsonTestUtils;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -511,7 +512,7 @@ private JsonNode runInspectorAndGetReport(
String output = capturer.getOutput();
capturer.clearOutput();
// Check if the output is effectively empty
- if (output.trim().isEmpty()) {
+ if (StringUtils.isBlank(output)) {
return null;
}
return JsonTestUtils.readTree(output);
diff --git a/hadoop-hdds/docs/content/tools/AuditParser.md b/hadoop-hdds/docs/content/tools/AuditParser.md
index ee2acd959c5..ebc5c8ae6a8 100644
--- a/hadoop-hdds/docs/content/tools/AuditParser.md
+++ b/hadoop-hdds/docs/content/tools/AuditParser.md
@@ -40,25 +40,25 @@ UNIQUE(datetime,level,logger,user,ip,op,params,result))
Usage:
{{< highlight bash >}}
-ozone auditparser [COMMAND] [PARAM]
+ozone debug auditparser [COMMAND] [PARAM]
{{< /highlight >}}
To load an audit log to database:
{{< highlight bash >}}
-ozone auditparser load
+ozone debug auditparser load
{{< /highlight >}}
Load command creates the audit table described above.
To run a custom read-only query:
{{< highlight bash >}}
-ozone auditparser query