peerNodesMap;
private File omRatisSnapshotDir;
@@ -1274,6 +1277,7 @@ private RPC.Server getRpcServer(OzoneConfiguration conf) throws IOException {
BlockingService omInterService =
OzoneManagerInterService.newReflectiveBlockingService(
omInterServerProtocol);
+ this.omGateway = new OMGateway(this);
OMAdminProtocolServerSideImpl omMetadataServerProtocol =
new OMAdminProtocolServerSideImpl(this);
@@ -1569,6 +1573,10 @@ public OzoneManagerRatisServer getOmRatisServer() {
return omRatisServer;
}
+ public OMGateway getOMGateway() {
+ return omGateway;
+ }
+
@VisibleForTesting
public OmRatisSnapshotProvider getOmSnapshotProvider() {
return omRatisSnapshotProvider;
@@ -2215,7 +2223,7 @@ public long getObjectIdFromTxId(long trxnId) {
}
@VisibleForTesting
- long getLastTrxnIndexForNonRatis() throws IOException {
+ public long getLastTrxnIndexForNonRatis() throws IOException {
TransactionInfo transactionInfo =
TransactionInfo.readTransactionInfo(metadataManager);
// If the OMTransactionInfo does not exist in DB, return 0 so that new incoming
@@ -2335,6 +2343,7 @@ public boolean stop() {
if (bucketUtilizationMetrics != null) {
bucketUtilizationMetrics.unRegister();
}
+ omGateway.stop();
return true;
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
@@ -3883,7 +3892,9 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
// Pause the State Machine so that no new transactions can be applied.
// This action also clears the OM Double Buffer so that if there are any
// pending transactions in the buffer, they are discarded.
- omRatisServer.getOmStateMachine().pause();
+ BaseStateMachine sm = isLeaderExecutorEnabled() ? omRatisServer.getOmBasicStateMachine()
+ : omRatisServer.getOmStateMachine();
+ sm.pause();
} catch (Exception e) {
LOG.error("Failed to stop/ pause the services. Cannot proceed with " +
"installing the new checkpoint.");
@@ -3963,7 +3974,7 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
time = Time.monotonicNow();
reloadOMState();
setTransactionInfo(TransactionInfo.valueOf(termIndex));
- omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+ unpauseStateMachine(term, lastAppliedIndex);
newMetadataManagerStarted = true;
LOG.info("Reloaded OM state with Term: {} and Index: {}. Spend {} ms",
term, lastAppliedIndex, Time.monotonicNow() - time);
@@ -3972,7 +3983,7 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
keyManager.start(configuration);
startSecretManagerIfNecessary();
startTrashEmptier(configuration);
- omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+ unpauseStateMachine(term, lastAppliedIndex);
LOG.info("OM DB is not stopped. Started services with Term: {} and " +
"Index: {}", term, lastAppliedIndex);
}
@@ -3988,8 +3999,7 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
omRpcServer = getRpcServer(configuration);
omRpcServer.start();
isOmRpcServerRunning = true;
- LOG.info("RPC server is re-started. Spend " +
- (Time.monotonicNow() - time) + " ms.");
+ LOG.info("RPC server is re-started. Spend " + (Time.monotonicNow() - time) + " ms.");
} catch (Exception e) {
String errorMsg = "Failed to start RPC Server.";
exitManager.exitSystem(1, errorMsg, e, LOG);
@@ -4021,6 +4031,14 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,
return newTermIndex;
}
+ private void unpauseStateMachine(long term, long lastAppliedIndex) {
+ if (isLeaderExecutorEnabled()) {
+ omRatisServer.getOmBasicStateMachine().unpause(lastAppliedIndex, term);
+ } else {
+ omRatisServer.getOmStateMachine().unpause(lastAppliedIndex, term);
+ }
+ }
+
private void stopTrashEmptier() {
if (this.emptier != null) {
emptier.interrupt();
@@ -5017,7 +5035,9 @@ public ReconfigurationHandler getReconfigurationHandler() {
*/
public void awaitDoubleBufferFlush() throws InterruptedException {
if (isRatisEnabled()) {
- getOmRatisServer().getOmStateMachine().awaitDoubleBufferFlush();
+ if (!isLeaderExecutorEnabled()) {
+ getOmRatisServer().getOmStateMachine().awaitDoubleBufferFlush();
+ }
} else {
getOmServerProtocol().awaitDoubleBufferFlush();
}
@@ -5029,4 +5049,9 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION);
}
}
+
+ public boolean isLeaderExecutorEnabled() {
+ return configuration.getBoolean(OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE,
+ OMConfigKeys.OZONE_OM_LEADER_EXECUTOR_ENABLE_DEFAULT);
+ }
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index af4d42ad68a..b2e89bf373e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.execution.OMBasicStateMachine;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -89,6 +90,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.SizeInBytes;
@@ -119,6 +121,7 @@ public final class OzoneManagerRatisServer {
private final OzoneManager ozoneManager;
private final OzoneManagerStateMachine omStateMachine;
+ private final OMBasicStateMachine omBasicStateMachine;
private final String ratisStorageDir;
private final OMPerformanceMetrics perfMetrics;
@@ -169,7 +172,17 @@ private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
LOG.info("Instantiating OM Ratis server with groupID: {} and peers: {}",
raftGroupIdStr, raftPeersStr.substring(2));
}
- this.omStateMachine = getStateMachine(conf);
+ BaseStateMachine sm = null;
+ if (ozoneManager.isLeaderExecutorEnabled()) {
+ this.omBasicStateMachine = new OMBasicStateMachine(this,
+ TracingUtil.isTracingEnabled(conf));
+ sm = this.omBasicStateMachine;
+ this.omStateMachine = null;
+ } else {
+ this.omStateMachine = getStateMachine(conf);
+ sm = this.omStateMachine;
+ this.omBasicStateMachine = null;
+ }
Parameters parameters = createServerTlsParameters(secConfig, certClient);
this.server = RaftServer.newBuilder()
@@ -177,7 +190,7 @@ private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
.setGroup(this.raftGroup)
.setProperties(serverProperties)
.setParameters(parameters)
- .setStateMachine(omStateMachine)
+ .setStateMachine(sm)
.setOption(RaftStorage.StartupOption.RECOVER)
.build();
this.serverDivision = MemoizedSupplier.valueOf(() -> {
@@ -591,6 +604,10 @@ public OzoneManagerStateMachine getOmStateMachine() {
return omStateMachine;
}
+ public OMBasicStateMachine getOmBasicStateMachine() {
+ return omBasicStateMachine;
+ }
+
public OzoneManager getOzoneManager() {
return ozoneManager;
}
@@ -855,6 +872,9 @@ public String getRatisStorageDir() {
}
public TermIndex getLastAppliedTermIndex() {
+ if (ozoneManager.isLeaderExecutorEnabled()) {
+ return omBasicStateMachine.getLastAppliedTermIndex();
+ }
return omStateMachine.getLastAppliedTermIndex();
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
new file mode 100644
index 00000000000..bfe51bf4bbb
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/FollowerRequestExecutor.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * om executor.
+ */
+public class FollowerRequestExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(FollowerRequestExecutor.class);
+ private static final int RATIS_TASK_POOL_SIZE = 1;
+ private static final int RATIS_TASK_QUEUE_SIZE = 1000;
+ private final AtomicLong callId = new AtomicLong(0);
+ private final OzoneManager ozoneManager;
+ private AtomicLong uniqueIndex;
+ private final PoolExecutor ratisSubmitter;
+ private final OzoneManagerRequestHandler handler;
+
+ public FollowerRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
+ this.ozoneManager = om;
+ this.uniqueIndex = uniqueIndex;
+ if (!om.isRatisEnabled()) {
+ this.handler = new OzoneManagerRequestHandler(ozoneManager);
+ } else {
+ this.handler = null;
+ }
+ ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, RATIS_TASK_QUEUE_SIZE,
+ ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
+ }
+ public void stop() {
+ ratisSubmitter.stop();
+ }
+ public int batchSize() {
+ return RATIS_TASK_POOL_SIZE;
+ }
+
+ public void submit(int idx, RequestContext ctx) throws InterruptedException {
+ ratisSubmitter.submit(idx, ctx);
+ }
+
+ private void ratisSubmitCommand(Collection ctxs, PoolExecutor nxtPool) {
+ for (RequestContext ctx : ctxs) {
+ sendDbUpdateRequest(ctx);
+ }
+ }
+
+ private void sendDbUpdateRequest(RequestContext ctx) {
+ try {
+ if (!ozoneManager.isRatisEnabled()) {
+ OzoneManagerProtocolProtos.OMResponse response = OMBasicStateMachine.runCommand(ctx.getRequest(),
+ TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler, ozoneManager);
+ ctx.getFuture().complete(response);
+ return;
+ }
+ // TODO hack way of transferring Leader index to follower nodes to use this index
+ // need check proper way of index
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
+ = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
+ reqBuilder.addIndex(uniqueIndex.incrementAndGet());
+ OzoneManagerProtocolProtos.OMRequest req = ctx.getRequest().toBuilder()
+ .setPersistDbRequest(reqBuilder.build()).build();
+ OzoneManagerProtocolProtos.OMResponse response = ozoneManager.getOmRatisServer().submitRequest(req,
+ ClientId.randomId(), callId.incrementAndGet());
+ ctx.getFuture().complete(response);
+ } catch (Throwable th) {
+ ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new IOException(th)));
+ }
+ }
+
+ private OzoneManagerProtocolProtos.OMResponse createErrorResponse(
+ OzoneManagerProtocolProtos.OMRequest omRequest, IOException exception) {
+ OzoneManagerProtocolProtos.OMResponse.Builder omResponseBuilder = OzoneManagerProtocolProtos.OMResponse.newBuilder()
+ .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+ .setCmdType(omRequest.getCmdType())
+ .setTraceID(omRequest.getTraceID())
+ .setSuccess(false);
+ if (exception.getMessage() != null) {
+ omResponseBuilder.setMessage(exception.getMessage());
+ }
+ OzoneManagerProtocolProtos.OMResponse omResponse = omResponseBuilder.build();
+ return omResponse;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
new file mode 100644
index 00000000000..c074918a1f1
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/LeaderRequestExecutor.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.RDBBatchOperation;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * om executor.
+ */
+public class LeaderRequestExecutor {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderRequestExecutor.class);
+ private static final int REQUEST_EXECUTOR_POOL_SIZE = 1;
+ private static final int REQUEST_EXECUTOR_QUEUE_SIZE = 1000;
+ private static final int RATIS_TASK_POOL_SIZE = 1;
+ private static final int RATIS_TASK_QUEUE_SIZE = 1000;
+ private static final long DUMMY_TERM = -1;
+ private final AtomicLong uniqueIndex;
+ private final int ratisByteLimit;
+ private final OzoneManager ozoneManager;
+ private final PoolExecutor ratisSubmitter;
+ private final PoolExecutor leaderExecutor;
+ private final OzoneManagerRequestHandler handler;
+ private final AtomicBoolean isEnabled = new AtomicBoolean(true);
+
+ public LeaderRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
+ this.ozoneManager = om;
+ this.handler = new OzoneManagerRequestHandler(ozoneManager);
+ ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE,
+ RATIS_TASK_QUEUE_SIZE, ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
+ leaderExecutor = new PoolExecutor<>(REQUEST_EXECUTOR_POOL_SIZE, REQUEST_EXECUTOR_QUEUE_SIZE,
+ ozoneManager.getThreadNamePrefix(), this::runExecuteCommand, ratisSubmitter);
+ int limit = (int) ozoneManager.getConfiguration().getStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+ OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+ StorageUnit.BYTES);
+ // always go to 90% of max limit for request as other header will be added
+ this.ratisByteLimit = (int) (limit * 0.8);
+ this.uniqueIndex = uniqueIndex;
+ }
+ public void stop() {
+ leaderExecutor.stop();
+ ratisSubmitter.stop();
+ }
+ public int batchSize() {
+ return REQUEST_EXECUTOR_POOL_SIZE;
+ }
+ public boolean isProcessing() {
+ return isEnabled.get();
+ }
+ public void disableProcessing() {
+ isEnabled.set(false);
+ }
+ public void enableProcessing() {
+ isEnabled.set(true);
+ }
+
+ public void submit(int idx, RequestContext ctx) throws InterruptedException {
+ if (!isEnabled.get()) {
+ rejectRequest(ctx);
+ return;
+ }
+ leaderExecutor.submit(idx, ctx);
+ }
+
+ private void rejectRequest(RequestContext ctx) {
+ if (!ozoneManager.isLeaderReady()) {
+ String peerId = ozoneManager.isRatisEnabled() ? ozoneManager.getOmRatisServer().getRaftPeerId().toString()
+ : ozoneManager.getOMNodeId();
+ OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(peerId
+ + " is not ready to process request yet.");
+ ctx.getFuture().completeExceptionally(leaderNotReadyException);
+ } else {
+ ctx.getFuture().completeExceptionally(new OMException("Request processing is disabled due to error",
+ OMException.ResultCodes.INTERNAL_ERROR));
+ }
+ }
+ private void rejectRequest(Collection ctxs) {
+ ctxs.forEach(ctx -> rejectRequest(ctx));
+ }
+
+ private void runExecuteCommand(Collection ctxs, PoolExecutor nxtPool) {
+ for (RequestContext ctx : ctxs) {
+ if (!isEnabled.get()) {
+ rejectRequest(ctx);
+ return;
+ }
+ executeRequest(ctx, nxtPool);
+ }
+ }
+
+ private void executeRequest(RequestContext ctx, PoolExecutor nxtPool) {
+ OMRequest request = ctx.getRequest();
+ TermIndex termIndex = TermIndex.valueOf(DUMMY_TERM, uniqueIndex.incrementAndGet());
+ ctx.setIndex(termIndex);
+ try {
+ handleRequest(ctx, termIndex);
+ } catch (IOException e) {
+ LOG.warn("Failed to write, Exception occurred ", e);
+ ctx.setResponse(createErrorResponse(request, e));
+ } catch (Throwable e) {
+ LOG.warn("Failed to write, Exception occurred ", e);
+ ctx.setResponse(createErrorResponse(request, new IOException(e)));
+ } finally {
+ if (ctx.getNextRequest() != null) {
+ try {
+ nxtPool.submit(0, ctx);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ handleBatchUpdateComplete(Collections.singletonList(ctx), null, null);
+ }
+ }
+ }
+
+ private void handleRequest(RequestContext ctx, TermIndex termIndex) throws IOException {
+ OMClientRequest omClientRequest = ctx.getClientRequest();
+ try {
+ OMClientResponse omClientResponse = handler.handleLeaderWriteRequest(omClientRequest, termIndex);
+ ctx.setResponse(omClientResponse.getOMResponse());
+ if (!omClientResponse.getOMResponse().getSuccess()) {
+ OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex);
+ } else {
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder nxtRequest = retrieveDbChanges(termIndex, omClientResponse);
+ if (nxtRequest != null) {
+ OMRequest.Builder omReqBuilder = OMRequest.newBuilder().setPersistDbRequest(nxtRequest.build())
+ .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb);
+ omReqBuilder.setClientId(ctx.getRequest().getClientId());
+ ctx.setNextRequest(nxtRequest);
+ } else {
+ OMAuditLogger.log(omClientRequest.getAuditBuilder(), termIndex);
+ }
+ }
+ } catch (Throwable th) {
+ OMAuditLogger.log(omClientRequest.getAuditBuilder(), omClientRequest, ozoneManager, termIndex, th);
+ throw th;
+ }
+ }
+
+ private OzoneManagerProtocolProtos.PersistDbRequest.Builder retrieveDbChanges(
+ TermIndex termIndex, OMClientResponse omClientResponse) throws IOException {
+ try (BatchOperation batchOperation = ozoneManager.getMetadataManager().getStore()
+ .initBatchOperation()) {
+ omClientResponse.checkAndUpdateDB(ozoneManager.getMetadataManager(), batchOperation);
+ // get db update and raise request to flush
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
+ = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
+ Map> cachedDbTxs
+ = ((RDBBatchOperation) batchOperation).getCachedTransaction();
+ for (Map.Entry> tblEntry : cachedDbTxs.entrySet()) {
+ OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
+ = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
+ tblBuilder.setTableName(tblEntry.getKey());
+ for (Map.Entry kvEntry : tblEntry.getValue().entrySet()) {
+ OzoneManagerProtocolProtos.DBTableRecord.Builder kvBuild
+ = OzoneManagerProtocolProtos.DBTableRecord.newBuilder();
+ kvBuild.setKey(ByteString.copyFrom(kvEntry.getKey()));
+ if (kvEntry.getValue() != null) {
+ kvBuild.setValue(ByteString.copyFrom(kvEntry.getValue()));
+ }
+ tblBuilder.addRecords(kvBuild.build());
+ }
+ reqBuilder.addTableUpdates(tblBuilder.build());
+ }
+ if (reqBuilder.getTableUpdatesCount() == 0) {
+ return null;
+ }
+ reqBuilder.addIndex(termIndex.getIndex());
+ return reqBuilder;
+ }
+ }
+
+ private void ratisSubmitCommand(Collection ctxs, PoolExecutor nxtPool) {
+ if (!isEnabled.get()) {
+ rejectRequest(ctxs);
+ return;
+ }
+ List sendList = new ArrayList<>();
+ OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder
+ = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
+ long size = 0;
+ for (RequestContext ctx : ctxs) {
+ List tblList = ctx.getNextRequest().getTableUpdatesList();
+ int tmpSize = 0;
+ for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) {
+ tmpSize += tblUpdates.getSerializedSize();
+ }
+ if ((tmpSize + size) > ratisByteLimit) {
+ // send current batched request
+ prepareAndSendRequest(sendList, reqBuilder);
+
+ // reinit and continue
+ reqBuilder = OzoneManagerProtocolProtos.PersistDbRequest.newBuilder();
+ size = 0;
+ sendList.clear();
+ }
+
+ // keep adding to batch list
+ size += tmpSize;
+ for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tblList) {
+ OzoneManagerProtocolProtos.DBTableUpdate.Builder tblBuilder
+ = OzoneManagerProtocolProtos.DBTableUpdate.newBuilder();
+ tblBuilder.setTableName(tblUpdates.getTableName());
+ tblBuilder.addAllRecords(tblUpdates.getRecordsList());
+ reqBuilder.addTableUpdates(tblBuilder.build());
+ }
+ reqBuilder.addIndex(ctx.getIndex().getIndex());
+ sendList.add(ctx);
+ }
+ if (sendList.size() > 0) {
+ prepareAndSendRequest(sendList, reqBuilder);
+ }
+ }
+
+ private void prepareAndSendRequest(
+ List sendList, OzoneManagerProtocolProtos.PersistDbRequest.Builder reqBuilder) {
+ RequestContext lastReqCtx = sendList.get(sendList.size() - 1);
+ OMRequest.Builder omReqBuilder = OMRequest.newBuilder().setPersistDbRequest(reqBuilder.build())
+ .setCmdType(OzoneManagerProtocolProtos.Type.PersistDb)
+ .setClientId(lastReqCtx.getRequest().getClientId());
+ try {
+ OMRequest reqBatch = omReqBuilder.build();
+ OMResponse dbUpdateRsp = sendDbUpdateRequest(reqBatch, lastReqCtx.getIndex());
+ if (!dbUpdateRsp.getSuccess()) {
+ throw new OMException(dbUpdateRsp.getMessage(),
+ OMException.ResultCodes.values()[dbUpdateRsp.getStatus().ordinal()]);
+ }
+ handleBatchUpdateComplete(sendList, null, dbUpdateRsp.getLeaderOMNodeId());
+ } catch (Throwable e) {
+ LOG.warn("Failed to write, Exception occurred ", e);
+ handleBatchUpdateComplete(sendList, e, null);
+ }
+ }
+
+ private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termIndex) throws Exception {
+ try {
+ if (!ozoneManager.isRatisEnabled()) {
+ return OMBasicStateMachine.runCommand(nextRequest, termIndex, handler, ozoneManager);
+ }
+ OMResponse response = ozoneManager.getOmRatisServer().submitRequest(nextRequest, ClientId.randomId(),
+ termIndex.getIndex());
+ return response;
+ } catch (Exception ex) {
+ throw ex;
+ }
+ }
+ private OMResponse createErrorResponse(OMRequest omRequest, IOException exception) {
+ OMResponse.Builder omResponseBuilder = OMResponse.newBuilder()
+ .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+ .setCmdType(omRequest.getCmdType())
+ .setTraceID(omRequest.getTraceID())
+ .setSuccess(false);
+ if (exception.getMessage() != null) {
+ omResponseBuilder.setMessage(exception.getMessage());
+ }
+ OMResponse omResponse = omResponseBuilder.build();
+ return omResponse;
+ }
+ private void handleBatchUpdateComplete(Collection ctxs, Throwable th, String leaderOMNodeId) {
+ Map> cleanupMap = new HashMap<>();
+ for (RequestContext ctx : ctxs) {
+ if (th != null) {
+ OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getClientRequest(), ozoneManager,
+ ctx.getIndex(), th);
+ if (th instanceof IOException) {
+ ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), (IOException)th));
+ } else {
+ ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new IOException(th)));
+ }
+
+ // TODO: no-cache, remove disable processing, let every request deal with ratis failure
+ disableProcessing();
+ } else {
+ OMAuditLogger.log(ctx.getClientRequest().getAuditBuilder(), ctx.getIndex());
+ OMResponse newRsp = ctx.getResponse();
+ if (leaderOMNodeId != null) {
+ newRsp = OMResponse.newBuilder(newRsp).setLeaderOMNodeId(leaderOMNodeId).build();
+ }
+ ctx.getFuture().complete(newRsp);
+ }
+
+ // cache cleanup
+ if (null != ctx.getNextRequest()) {
+ List tblList = ctx.getNextRequest().getTableUpdatesList();
+ for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdate : tblList) {
+ List epochs = cleanupMap.computeIfAbsent(tblUpdate.getTableName(), k -> new ArrayList<>());
+ epochs.add(ctx.getIndex().getIndex());
+ }
+ }
+ }
+ // TODO: no-cache, no need cleanup cache
+ for (Map.Entry> entry : cleanupMap.entrySet()) {
+ ozoneManager.getMetadataManager().getTable(entry.getKey()).cleanupCache(entry.getValue());
+ }
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
new file mode 100644
index 00000000000..fd2962ac5bf
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMBasicStateMachine.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.hadoop.hdds.utils.NettyMetrics;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.hadoop.ozone.protocolPB.RequestHandler;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.INTERNAL_ERROR;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.METADATA_ERROR;
+
+/**
+ * The OM StateMachine is the state machine for OM Ratis server. It is
+ * responsible for applying ratis committed transactions to
+ * {@link OzoneManager}.
+ */
+public class OMBasicStateMachine extends BaseStateMachine {
+
+ public static final Logger LOG = LoggerFactory.getLogger(OMBasicStateMachine.class);
+ private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
+ private final OzoneManager ozoneManager;
+ private final boolean isTracingEnabled;
+ private RequestHandler handler;
+ private RaftGroupId raftGroupId;
+ private final ExecutorService installSnapshotExecutor;
+ private final AtomicInteger statePausedCount = new AtomicInteger(0);
+ private final String threadPrefix;
+ private final NettyMetrics nettyMetrics;
+ private final List> notifiers = new CopyOnWriteArrayList<>();
+
+ public OMBasicStateMachine(OzoneManagerRatisServer ratisServer, boolean isTracingEnabled) throws IOException {
+ this.isTracingEnabled = isTracingEnabled;
+ this.ozoneManager = ratisServer.getOzoneManager();
+
+ loadSnapshotInfoFromDB();
+ this.threadPrefix = ozoneManager.getThreadNamePrefix();
+
+ this.handler = new OzoneManagerRequestHandler(ozoneManager);
+
+ ThreadFactory installSnapshotThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat(threadPrefix + "InstallSnapshotThread").build();
+ this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor(installSnapshotThreadFactory);
+ this.nettyMetrics = NettyMetrics.create();
+ }
+
+ /**
+ * Initializes the State Machine with the given server, group and storage.
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ getLifeCycle().startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ this.raftGroupId = id;
+ storage.init(raftStorage);
+ });
+ }
+
+ @Override
+ public synchronized void reinitialize() throws IOException {
+ loadSnapshotInfoFromDB();
+ if (getLifeCycleState() == LifeCycle.State.PAUSED) {
+ unpause(getLastAppliedTermIndex().getIndex(),
+ getLastAppliedTermIndex().getTerm());
+ }
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ final SnapshotInfo snapshotInfo = ozoneManager.getTransactionInfo().toSnapshotInfo();
+ LOG.debug("Latest Snapshot Info {}", snapshotInfo);
+ return snapshotInfo;
+ }
+
+ @Override
+ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
+ // Initialize OMHAMetrics
+ ozoneManager.omHAMetricsInit(newLeaderId.toString());
+ for (Consumer notifier : notifiers) {
+ notifier.accept(newLeaderId.toString());
+ }
+ }
+
+ public void registerLeaderNotifier(Consumer notifier) {
+ notifiers.add(notifier);
+ }
+ /** Notified by Ratis for non-StateMachine term-index update. */
+ @Override
+ public synchronized void notifyTermIndexUpdated(long currentTerm, long newIndex) {
+ updateLastAppliedTermIndex(TermIndex.valueOf(currentTerm, newIndex));
+ }
+
+ @Override
+ protected synchronized boolean updateLastAppliedTermIndex(TermIndex newTermIndex) {
+ return super.updateLastAppliedTermIndex(newTermIndex);
+ }
+
+ /**
+ * Called to notify state machine about configuration changes.
+ * Configurations changes include addition of newly bootstrapped OM.
+ */
+ @Override
+ public void notifyConfigurationChanged(long term, long index,
+ RaftProtos.RaftConfigurationProto newRaftConfiguration) {
+ List newPeers = newRaftConfiguration.getPeersList();
+ LOG.info("Received Configuration change notification from Ratis. New Peer" +
+ " list:\n{}", newPeers);
+
+ List newPeerIds = new ArrayList<>();
+ for (RaftProtos.RaftPeerProto raftPeerProto : newPeers) {
+ newPeerIds.add(RaftPeerId.valueOf(raftPeerProto.getId()).toString());
+ }
+ // Check and update the peer list in OzoneManager
+ ozoneManager.updatePeerList(newPeerIds);
+ }
+
+ /**
+ * Called to notify state machine about the snapshot install result.
+ * Trigger the cleanup of candidate DB dir.
+ * @param result InstallSnapshotResult
+ * @param snapshotIndex the index of installed snapshot
+ * @param peer the peer which fini
+ */
+ @Override
+ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result,
+ long snapshotIndex, RaftPeer peer) {
+ LOG.info("Receive notifySnapshotInstalled event {} for the peer: {}" +
+ " snapshotIndex: {}.", result, peer.getId(), snapshotIndex);
+ switch (result) {
+ case SUCCESS:
+ case SNAPSHOT_UNAVAILABLE:
+ // Currently, only trigger for the one who installed snapshot
+ if (ozoneManager.getOmRatisServer().getServerDivision().getPeer().equals(peer)) {
+ ozoneManager.getOmSnapshotProvider().init();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ /**
+ * Validate/pre-process the incoming update request in the state machine.
+ * @return the content to be written to the log entry. Null means the request
+ * should be rejected.
+ * @throws IOException thrown by the state machine while validating
+ */
+ @Override
+ public TransactionContext startTransaction(
+ RaftClientRequest raftClientRequest) throws IOException {
+ ByteString messageContent = raftClientRequest.getMessage().getContent();
+ OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(messageContent);
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(raftClientRequest.getMessage().getContent())
+ .setStateMachineContext(omRequest)
+ .build();
+ }
+
+ @Override
+ public TransactionContext preAppendTransaction(TransactionContext trx) throws IOException {
+ return trx;
+ }
+
+ /*
+ * Apply a committed log entry to the state machine.
+ */
+ @Override
+ public CompletableFuture applyTransaction(TransactionContext trx) {
+ final Object context = trx.getStateMachineContext();
+ final TermIndex termIndex = TermIndex.valueOf(trx.getLogEntry());
+ try {
+ // For the Leader, the OMRequest is set in trx in startTransaction.
+ // For Followers, the OMRequest hast to be converted from the log entry.
+ final OMRequest request = context != null ? (OMRequest) context
+ : OMRatisHelper.convertByteStringToOMRequest(
+ trx.getStateMachineLogEntry().getLogData());
+ OMResponse response = runCommand(request, termIndex, handler, ozoneManager);
+ CompletableFuture future = new CompletableFuture<>();
+ future.complete(OMRatisHelper.convertResponseToMessage(response));
+ return future;
+ } catch (Exception e) {
+ return completeExceptionally(e);
+ } finally {
+ updateLastAppliedTermIndex(termIndex);
+ }
+ }
+
+ private static void terminate(OMResponse omResponse, OMException.ResultCodes resultCode) {
+ OMException exception = new OMException(omResponse.getMessage(), resultCode);
+ String errorMessage = "OM Ratis Server has received unrecoverable " +
+ "error, to avoid further DB corruption, terminating OM. Error " +
+ "Response received is:" + omResponse;
+ ExitUtils.terminate(1, errorMessage, exception, LOG);
+ }
+
+ /**
+ * Query the state machine. The request must be read-only.
+ */
+ @Override
+ public CompletableFuture query(Message request) {
+ try {
+ OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
+ request.getContent());
+ return CompletableFuture.completedFuture(OMRatisHelper.convertResponseToMessage(
+ handler.handleReadRequest(omRequest)));
+ } catch (IOException e) {
+ return completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public synchronized void pause() {
+ LOG.info("OzoneManagerStateMachine is pausing");
+ statePausedCount.incrementAndGet();
+ final LifeCycle.State state = getLifeCycleState();
+ if (state == LifeCycle.State.PAUSED) {
+ return;
+ }
+ if (state != LifeCycle.State.NEW) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+ }
+
+ /**
+ * Unpause the StateMachine, re-initialize the DoubleBuffer and update the
+ * lastAppliedIndex. This should be done after uploading new state to the
+ * StateMachine.
+ */
+ public synchronized void unpause(long newLastAppliedSnaphsotIndex,
+ long newLastAppliedSnapShotTermIndex) {
+ LOG.info("OzoneManagerStateMachine is un-pausing");
+ if (statePausedCount.decrementAndGet() == 0) {
+ getLifeCycle().startAndTransition(() -> {
+ this.setLastAppliedTermIndex(TermIndex.valueOf(
+ newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+ });
+ }
+ }
+
+ /**
+ * Take OM Ratis snapshot is a dummy operation as when double buffer
+ * flushes the lastAppliedIndex is flushed to DB and that is used as
+ * snapshot index.
+ *
+ * @return the last applied index on the state machine which has been
+ * stored in the snapshot file.
+ */
+ @Override
+ public long takeSnapshot() throws IOException {
+ // wait until applied == skipped
+ if (ozoneManager.isStopped()) {
+ throw new IOException("OzoneManager is already stopped: " + ozoneManager.getNodeDetails());
+ }
+ final TermIndex applied = getLastAppliedTermIndex();
+ Long index = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager()).getIndex();
+ final TransactionInfo transactionInfo = TransactionInfo.valueOf(applied, index);
+ ozoneManager.setTransactionInfo(transactionInfo);
+ ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY, transactionInfo);
+ ozoneManager.getMetadataManager().getStore().flushDB();
+ return applied.getIndex();
+ }
+
+ /**
+ * Leader OM has purged entries from its log. To catch up, OM must download
+ * the latest checkpoint from the leader OM and install it.
+ * @param roleInfoProto the leader node information
+ * @param firstTermIndexInLog TermIndex of the first append entry available
+ * in the Leader's log.
+ * @return the last term index included in the installed snapshot.
+ */
+ @Override
+ public CompletableFuture notifyInstallSnapshotFromLeader(
+ RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
+ String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo()
+ .getLeaderInfo().getId().getId()).toString();
+ LOG.info("Received install snapshot notification from OM leader: {} with " +
+ "term index: {}", leaderNodeId, firstTermIndexInLog);
+ return CompletableFuture.supplyAsync(
+ () -> ozoneManager.installSnapshotFromLeader(leaderNodeId), installSnapshotExecutor);
+ }
+
+ @Override
+ public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
+ return OMRatisHelper.smProtoToString(proto);
+ }
+
+ @Override
+ public void close() {
+ // OM should be shutdown as the StateMachine has shutdown.
+ if (!ozoneManager.isStopped()) {
+ LOG.info("Stopping {}. Shutdown also OzoneManager {}.", this, ozoneManager);
+ ozoneManager.shutDown("OM state machine is shutdown by Ratis server");
+ } else {
+ LOG.info("Stopping {}.", this);
+ stop();
+ }
+ }
+
+ /**
+ * Submits write request to OM and returns the response Message.
+ * @param request OMRequest
+ * @return response from OM
+ */
+ public static OMResponse runCommand(
+ OMRequest request, TermIndex termIndex, RequestHandler handler, OzoneManager om) {
+ OMClientResponse omClientResponse = null;
+ try {
+ long index = 0;
+ TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(om.getMetadataManager());
+ if (null != transactionInfo && null != transactionInfo.getIndex()) {
+ index = transactionInfo.getIndex();
+ }
+ try {
+ if (request.hasPersistDbRequest() && request.getPersistDbRequest().getIndexCount() > 0) {
+ index = Math.max(Collections.max(request.getPersistDbRequest().getIndexList()).longValue(), index);
+ }
+ TermIndex objectIndex = termIndex;
+ // TODO temp fix for index sharing from leader to follower in follower execution
+ if (request.getCmdType() != OzoneManagerProtocolProtos.Type.PersistDb
+ && request.getCmdType() != OzoneManagerProtocolProtos.Type.Prepare) {
+ objectIndex = TermIndex.valueOf(termIndex.getTerm(), index);
+ }
+ omClientResponse = handler.handleWriteRequestImpl(request, objectIndex);
+ validateResponseError(omClientResponse.getOMResponse());
+ } catch (IOException e) {
+ LOG.warn("Failed to apply command, Exception occurred ", e);
+ omClientResponse = new DummyOMClientResponse(createErrorResponse(request, e, termIndex));
+ validateResponseError(omClientResponse.getOMResponse());
+ om.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
+ TransactionInfo.valueOf(termIndex, index));
+ }
+
+ if (!(omClientResponse instanceof DummyOMClientResponse)) {
+ // need perform db operation for other request (not for PersistDB request)
+ try (BatchOperation batchOperation = om.getMetadataManager().getStore().initBatchOperation()) {
+ omClientResponse.checkAndUpdateDB(om.getMetadataManager(), batchOperation);
+ om.getMetadataManager().getTransactionInfoTable().putWithBatch(
+ batchOperation, TRANSACTION_INFO_KEY, TransactionInfo.valueOf(termIndex, index));
+ om.getMetadataManager().getStore().commitBatchOperation(batchOperation);
+ }
+ }
+ return omClientResponse.getOMResponse();
+ } catch (Throwable e) {
+ // For any further exceptions, terminate OM as db update fails
+ String errorMessage = "Request " + request + " failed with exception";
+ ExitUtils.terminate(1, errorMessage, e, LOG);
+ }
+ return null;
+ }
+
+ private static void validateResponseError(OMResponse omResponse) {
+ if (omResponse.getStatus() == INTERNAL_ERROR) {
+ terminate(omResponse, OMException.ResultCodes.INTERNAL_ERROR);
+ } else if (omResponse.getStatus() == METADATA_ERROR) {
+ terminate(omResponse, OMException.ResultCodes.METADATA_ERROR);
+ }
+ }
+
+ private static OMResponse createErrorResponse(
+ OMRequest omRequest, IOException exception, TermIndex termIndex) {
+ OMResponse.Builder omResponseBuilder = OMResponse.newBuilder()
+ .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+ .setCmdType(omRequest.getCmdType())
+ .setTraceID(omRequest.getTraceID())
+ .setSuccess(false);
+ if (exception.getMessage() != null) {
+ omResponseBuilder.setMessage(exception.getMessage());
+ }
+ OMResponse omResponse = omResponseBuilder.build();
+ return omResponse;
+ }
+
+ public void loadSnapshotInfoFromDB() throws IOException {
+ // This is done, as we have a check in Ratis for not throwing LeaderNotReadyException,
+ // it checks stateMachineIndex >= raftLog nextIndex (placeHolderIndex).
+ TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager());
+ if (transactionInfo != null) {
+ final TermIndex ti = transactionInfo.getTermIndex();
+ setLastAppliedTermIndex(ti);
+ ozoneManager.setTransactionInfo(transactionInfo);
+ LOG.info("LastAppliedIndex is set from TransactionInfo from OM DB as {}", ti);
+ } else {
+ LOG.info("TransactionInfo not found in OM DB.");
+ }
+ }
+
+ private static CompletableFuture completeExceptionally(Exception e) {
+ final CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ @VisibleForTesting
+ public void setHandler(OzoneManagerRequestHandler handler) {
+ this.handler = handler;
+ }
+
+ @VisibleForTesting
+ public OzoneManagerRequestHandler getHandler() {
+ return (OzoneManagerRequestHandler) this.handler;
+ }
+
+ public void stop() {
+ HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+ if (this.nettyMetrics != null) {
+ this.nettyMetrics.unregister();
+ }
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
new file mode 100644
index 00000000000..8942d044397
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/OMGateway.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TypedTable;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.OzoneManagerPrepareState;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * entry for request execution.
+ */
+public class OMGateway {
+ private static final Logger LOG = LoggerFactory.getLogger(OMGateway.class);
+ private final LeaderRequestExecutor leaderExecutor;
+ private final FollowerRequestExecutor followerExecutor;
+ private final OzoneManager om;
+ private final AtomicLong requestInProgress = new AtomicLong(0);
+ /**
+ * uniqueIndex is used to generate index used in objectId creation uniquely accross OM nodes.
+ * This makes use of termIndex for init shifted within 54 bits.
+ */
+ private AtomicLong uniqueIndex = new AtomicLong();
+
+ public OMGateway(OzoneManager om) throws IOException {
+ this.om = om;
+ this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex);
+ this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex);
+ if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) {
+ OzoneManagerRatisServer ratisServer = om.getOmRatisServer();
+ ratisServer.getOmBasicStateMachine().registerLeaderNotifier(this::leaderChangeNotifier);
+ TransactionInfo transactionInfo = om.getTransactionInfo();
+ if (transactionInfo != null) {
+ if (transactionInfo.getIndex() != null) {
+ uniqueIndex.set(transactionInfo.getIndex());
+ } else if (transactionInfo.getTransactionIndex() >= 0) {
+ uniqueIndex.set(transactionInfo.getTransactionIndex());
+ }
+ }
+ } else {
+ // for non-ratis flow, init with last index
+ uniqueIndex.set(om.getLastTrxnIndexForNonRatis());
+ }
+ }
+ public void stop() {
+ leaderExecutor.stop();
+ followerExecutor.stop();
+ }
+ public OMResponse submit(OMRequest omRequest) throws ServiceException {
+ if (!om.isLeaderReady()) {
+ String peerId = om.isRatisEnabled() ? om.getOmRatisServer().getRaftPeerId().toString() : om.getOMNodeId();
+ OMLeaderNotReadyException leaderNotReadyException = new OMLeaderNotReadyException(peerId
+ + " is not ready to process request yet.");
+ throw new ServiceException(leaderNotReadyException);
+ }
+ executorEnable();
+ RequestContext requestContext = new RequestContext();
+ requestContext.setRequest(omRequest);
+ requestInProgress.incrementAndGet();
+ requestContext.setFuture(new CompletableFuture<>());
+ CompletableFuture f = requestContext.getFuture()
+ .whenComplete((r, th) -> handleAfterExecution(requestContext, th));
+ try {
+ // TODO gateway locking: take lock with OMLockDetails
+ // TODO scheduling of request to pool
+ om.checkLeaderStatus();
+ validate(omRequest);
+ OMClientRequest omClientRequest = OzoneManagerRatisUtils.createClientRequest(omRequest, om);
+ requestContext.setClientRequest(omClientRequest);
+
+ // submit request
+ ExecutorType executorType = executorSelector(omRequest);
+ if (executorType == ExecutorType.LEADER_COMPATIBLE) {
+ leaderExecutor.submit(0, requestContext);
+ } else if (executorType == ExecutorType.FOLLOWER) {
+ followerExecutor.submit(0, requestContext);
+ } else {
+ leaderExecutor.submit(0, requestContext);
+ }
+ } catch (InterruptedException e) {
+ requestContext.getFuture().completeExceptionally(e);
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ requestContext.getFuture().completeExceptionally(e);
+ }
+ try {
+ return f.get();
+ } catch (ExecutionException ex) {
+ throw new ServiceException(ex.getMessage(), ex);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new ServiceException(ex.getMessage(), ex);
+ }
+ }
+
+ private void validate(OMRequest omRequest) throws IOException {
+ OzoneManagerRequestHandler.requestParamValidation(omRequest);
+ // validate prepare state
+ OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
+ OzoneManagerPrepareState prepareState = om.getPrepareState();
+ if (cmdType == OzoneManagerProtocolProtos.Type.Prepare) {
+ // Must authenticate prepare requests here, since we must determine
+ // whether or not to apply the prepare gate before proceeding with the
+ // prepare request.
+ UserGroupInformation userGroupInformation =
+ UserGroupInformation.createRemoteUser(omRequest.getUserInfo().getUserName());
+ if (om.getAclsEnabled() && !om.isAdmin(userGroupInformation)) {
+ String message = "Access denied for user " + userGroupInformation + ". "
+ + "Superuser privilege is required to prepare ozone managers.";
+ throw new OMException(message, OMException.ResultCodes.ACCESS_DENIED);
+ } else {
+ prepareState.enablePrepareGate();
+ }
+ }
+
+ // In prepare mode, only prepare and cancel requests are allowed to go
+ // through.
+ if (!prepareState.requestAllowed(cmdType)) {
+ String message = "Cannot apply write request " +
+ omRequest.getCmdType().name() + " when OM is in prepare mode.";
+ throw new OMException(message, OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED);
+ }
+ }
+ private void handleAfterExecution(RequestContext ctx, Throwable th) {
+ // TODO: gateway locking: release lock and OMLockDetails update
+ requestInProgress.decrementAndGet();
+ }
+
+ public void leaderChangeNotifier(String newLeaderId) {
+ boolean isLeader = om.getOMNodeId().equals(newLeaderId);
+ if (isLeader) {
+ cleanupCache();
+ resetUniqueIndex();
+ } else {
+ leaderExecutor.disableProcessing();
+ }
+ }
+
+ private void resetUniqueIndex() {
+ Long index = null;
+ try {
+ TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(om.getMetadataManager());
+ if (null != transactionInfo) {
+ index = transactionInfo.getIndex();
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to initialized index from TransactionInfoTable");
+ }
+ if (null != index) {
+ uniqueIndex.set(index);
+ }
+ }
+
+ private void rebuildBucketVolumeCache() throws IOException {
+ LOG.info("Rebuild of bucket and volume cache");
+ Table bucketTable = om.getMetadataManager().getBucketTable();
+ Set cachedBucketKeySet = new HashSet<>();
+ Iterator, CacheValue>> cacheItr = bucketTable.cacheIterator();
+ while (cacheItr.hasNext()) {
+ cachedBucketKeySet.add(cacheItr.next().getKey().getCacheKey());
+ }
+ try (TableIterator> bucItr = bucketTable.iterator()) {
+ while (bucItr.hasNext()) {
+ Table.KeyValue next = bucItr.next();
+ bucketTable.addCacheEntry(next.getKey(), next.getValue(), -1);
+ cachedBucketKeySet.remove(next.getKey());
+ }
+ }
+
+ // removing extra cache entry
+ for (String key : cachedBucketKeySet) {
+ bucketTable.addCacheEntry(key, -1);
+ }
+
+ Set cachedVolumeKeySet = new HashSet<>();
+ Table volumeTable = om.getMetadataManager().getVolumeTable();
+ Iterator, CacheValue>> volCacheItr = volumeTable.cacheIterator();
+ while (volCacheItr.hasNext()) {
+ cachedVolumeKeySet.add(volCacheItr.next().getKey().getCacheKey());
+ }
+ try (TableIterator> volItr = volumeTable.iterator()) {
+ while (volItr.hasNext()) {
+ Table.KeyValue next = volItr.next();
+ volumeTable.addCacheEntry(next.getKey(), next.getValue(), -1);
+ cachedVolumeKeySet.remove(next.getKey());
+ }
+ }
+
+ // removing extra cache entry
+ for (String key : cachedVolumeKeySet) {
+ volumeTable.addCacheEntry(key, -1);
+ }
+ }
+
+ public void cleanupCache() {
+ // TODO no-cache case, no need re-build bucket/volume cache and cleanup of cache
+ LOG.debug("clean all table cache and update bucket/volume with db");
+ for (String tbl : om.getMetadataManager().listTableNames()) {
+ Table table = om.getMetadataManager().getTable(tbl);
+ if (table instanceof TypedTable) {
+ ArrayList epochs = new ArrayList<>(((TypedTable, ?>) table).getCache().getEpochEntries().keySet());
+ if (!epochs.isEmpty()) {
+ table.cleanupCache(epochs);
+ }
+ }
+ }
+ try {
+ rebuildBucketVolumeCache();
+ } catch (IOException e) {
+ // retry once, else om down
+ try {
+ rebuildBucketVolumeCache();
+ } catch (IOException ex) {
+ String errorMessage = "OM unable to access rocksdb, terminating OM. Error " + ex.getMessage();
+ ExitUtils.terminate(1, errorMessage, ex, LOG);
+ }
+ }
+ }
+ public void executorEnable() throws ServiceException {
+ if (leaderExecutor.isProcessing()) {
+ return;
+ }
+ if (requestInProgress.get() == 0) {
+ cleanupCache();
+ leaderExecutor.enableProcessing();
+ } else {
+ LOG.warn("Executor is not enabled, previous request {} is still not cleaned", requestInProgress.get());
+ String msg = "Request processing is disabled due to error";
+ throw new ServiceException(msg, new OMException(msg, OMException.ResultCodes.INTERNAL_ERROR));
+ }
+ }
+
+ private ExecutorType executorSelector(OMRequest req) {
+ switch (req.getCmdType()) {
+ case EchoRPC:
+ return ExecutorType.LEADER_OPTIMIZED;
+ /* cases with Secret manager cache */
+ case GetS3Secret:
+ case SetS3Secret:
+ case RevokeS3Secret:
+ case TenantAssignUserAccessId:
+ case TenantRevokeUserAccessId:
+ case TenantAssignAdmin:
+ case TenantRevokeAdmin:
+ /* cases for upgrade */
+ case FinalizeUpgrade:
+ case Prepare:
+ case CancelPrepare:
+ /* cases for snapshot db update */
+ case PurgeKeys:
+ case PurgeDirectories:
+ case RenameKey:
+ case RenameKeys:
+ /* cases for snapshot */
+ case SnapshotMoveDeletedKeys:
+ case SnapshotPurge:
+ case SetSnapshotProperty:
+ case CreateSnapshot:
+ case DeleteSnapshot:
+ case RenameSnapshot:
+ return ExecutorType.FOLLOWER;
+ default:
+ return ExecutorType.LEADER_COMPATIBLE;
+ }
+ }
+
+ enum ExecutorType {
+ LEADER_COMPATIBLE,
+ FOLLOWER,
+ LEADER_OPTIMIZED
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
new file mode 100644
index 00000000000..fe9ae728fae
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/PoolExecutor.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+/**
+ * Pool executor.
+ */
+public class PoolExecutor {
+ private Thread[] threadPool;
+ private List> queues;
+ private BiConsumer, PoolExecutor> handler = null;
+ private PoolExecutor nxtPool;
+ private AtomicBoolean isRunning = new AtomicBoolean(true);
+
+ private PoolExecutor(int poolSize, int queueSize, String threadPrefix) {
+ threadPool = new Thread[poolSize];
+ queues = new ArrayList<>(poolSize);
+ for (int i = 0; i < poolSize; ++i) {
+ LinkedBlockingQueue queue = new LinkedBlockingQueue<>(1000);
+ queues.add(queue);
+ threadPool[i] = new Thread(() -> execute(queue), threadPrefix + "OMExecutor-" + i);
+ threadPool[i].start();
+ }
+ }
+ public PoolExecutor(
+ int poolSize, int queueSize, String threadPrefix, BiConsumer, PoolExecutor> handler,
+ PoolExecutor nxtPool) {
+ this(poolSize, queueSize, threadPrefix);
+ this.handler = handler;
+ this.nxtPool = nxtPool;
+ }
+ public void submit(int idx, T task) throws InterruptedException {
+ if (idx < 0 || idx >= threadPool.length) {
+ return;
+ }
+ queues.get(idx).put(task);
+ }
+
+ private void execute(BlockingQueue q) {
+ while (isRunning.get()) {
+ try {
+ List entries = new LinkedList<>();
+ T task = q.take();
+ entries.add(task);
+ q.drainTo(entries);
+ handler.accept(entries, nxtPool);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+ public void stop() {
+ for (int i = 0; i < threadPool.length; ++i) {
+ threadPool[i].interrupt();
+ try {
+ threadPool[i].join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java
new file mode 100644
index 00000000000..31994a06e3c
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/RequestContext.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.ratis.server.protocol.TermIndex;
+
+/**
+ * request processing information.
+ */
+public final class RequestContext {
+ private OMRequest request;
+ private OMClientRequest clientRequest;
+ private OMResponse response;
+ private TermIndex index;
+ private CompletableFuture future;
+ private OzoneManagerProtocolProtos.PersistDbRequest.Builder nextRequest;
+
+ public RequestContext() {
+ }
+
+ public OMRequest getRequest() {
+ return request;
+ }
+
+ public void setRequest(OMRequest request) {
+ this.request = request;
+ }
+
+ public OMResponse getResponse() {
+ return response;
+ }
+
+ public void setResponse(OMResponse response) {
+ this.response = response;
+ }
+
+ public TermIndex getIndex() {
+ return index;
+ }
+
+ public void setIndex(TermIndex index) {
+ this.index = index;
+ }
+
+ public CompletableFuture getFuture() {
+ return future;
+ }
+
+ public void setFuture(CompletableFuture future) {
+ this.future = future;
+ }
+
+ public OzoneManagerProtocolProtos.PersistDbRequest.Builder getNextRequest() {
+ return nextRequest;
+ }
+
+ public void setNextRequest(OzoneManagerProtocolProtos.PersistDbRequest.Builder nextRequest) {
+ this.nextRequest = nextRequest;
+ }
+
+ public OMClientRequest getClientRequest() {
+ return clientRequest;
+ }
+
+ public void setClientRequest(OMClientRequest clientRequest) {
+ this.clientRequest = clientRequest;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/package-info.java
new file mode 100644
index 00000000000..f91f574c641
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/execution/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ratis.execution;
+
+/**
+ * This package contains classes for the OM execution implementation.
+ */
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index ffaedaa06a9..b931b4fcac2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
import org.apache.hadoop.ozone.om.request.BucketLayoutAwareOMKeyRequestFactory;
+import org.apache.hadoop.ozone.om.request.OMPersistDbRequest;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketCreateRequest;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketDeleteRequest;
import org.apache.hadoop.ozone.om.request.bucket.OMBucketSetOwnerRequest;
@@ -337,6 +338,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest,
return new S3ExpiredMultipartUploadsAbortRequest(omRequest);
case QuotaRepair:
return new OMQuotaRepairRequest(omRequest);
+ case PersistDb:
+ return new OMPersistDbRequest(omRequest);
default:
throw new OMException("Unrecognized write command type request "
+ cmdType, OMException.ResultCodes.INVALID_REQUEST);
@@ -509,7 +512,11 @@ public static GrpcTlsConfig createServerTlsConfig(SecurityConfig conf,
public static OzoneManagerProtocolProtos.OMResponse submitRequest(
OzoneManager om, OMRequest omRequest, ClientId clientId, long callId) throws ServiceException {
if (om.isRatisEnabled()) {
- return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
+ if (om.isLeaderExecutorEnabled()) {
+ return om.getOMGateway().submit(omRequest);
+ } else {
+ return om.getOmRatisServer().submitRequest(omRequest, clientId, callId);
+ }
} else {
return om.getOmServerProtocol().submitRequest(NULL_RPC_CONTROLLER, omRequest);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
new file mode 100644
index 00000000000..29e57ae916d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMPersistDbRequest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.request;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.audit.OMSystemAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
+
+/**
+ * Handle OMQuotaRepairRequest Request.
+ */
+public class OMPersistDbRequest extends OMClientRequest {
+ private static final Logger LOG = LoggerFactory.getLogger(OMPersistDbRequest.class);
+
+ public OMPersistDbRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+ UserGroupInformation ugi = createUGIForApi();
+ if (ozoneManager.getAclsEnabled() && !ozoneManager.isAdmin(ugi)) {
+ throw new OMException("Access denied for user " + ugi + ". Admin privilege is required.",
+ OMException.ResultCodes.ACCESS_DENIED);
+ }
+ return super.preExecute(ozoneManager);
+ }
+
+ @Override
+ @SuppressWarnings("methodlength")
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) {
+ OzoneManagerProtocolProtos.OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest());
+ OzoneManagerProtocolProtos.PersistDbRequest dbUpdateRequest = getOmRequest().getPersistDbRequest();
+
+ OMMetadataManager metadataManager = ozoneManager.getMetadataManager();
+ try (BatchOperation batchOperation = metadataManager.getStore()
+ .initBatchOperation()) {
+ List tableUpdatesList = dbUpdateRequest.getTableUpdatesList();
+ for (OzoneManagerProtocolProtos.DBTableUpdate tblUpdates : tableUpdatesList) {
+ Table table = metadataManager.getTable(tblUpdates.getTableName());
+ List recordsList = tblUpdates.getRecordsList();
+ for (OzoneManagerProtocolProtos.DBTableRecord record : recordsList) {
+ if (record.hasValue()) {
+ // put
+ table.getRawTable().putWithBatch(batchOperation, record.getKey().toByteArray(),
+ record.getValue().toByteArray());
+ } else {
+ // delete
+ table.getRawTable().deleteWithBatch(batchOperation, record.getKey().toByteArray());
+ }
+ }
+ }
+ long txIndex = 0;
+ TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(metadataManager);
+ if (transactionInfo != null && transactionInfo.getIndex() != null) {
+ txIndex = transactionInfo.getIndex();
+ }
+ txIndex = Math.max(Collections.max(getOmRequest().getPersistDbRequest().getIndexList()).longValue(), txIndex);
+ metadataManager.getTransactionInfoTable().putWithBatch(
+ batchOperation, TRANSACTION_INFO_KEY, TransactionInfo.valueOf(termIndex, txIndex));
+ metadataManager.getStore().commitBatchOperation(batchOperation);
+ omResponse.setPersistDbResponse(OzoneManagerProtocolProtos.PersistDbResponse.newBuilder().build());
+ refreshCache(ozoneManager, tableUpdatesList);
+ } catch (IOException ex) {
+ audit(ozoneManager, dbUpdateRequest, termIndex, ex);
+ LOG.error("Db persist exception", ex);
+ return new DummyOMClientResponse(createErrorOMResponse(omResponse, ex));
+ }
+ audit(ozoneManager, dbUpdateRequest, termIndex, null);
+ OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse.build());
+ return omClientResponse;
+ }
+
+ public void audit(OzoneManager ozoneManager, OzoneManagerProtocolProtos.PersistDbRequest request,
+ TermIndex termIndex, Throwable th) {
+ List indexList = request.getIndexList();
+ Map auditMap = new HashMap<>();
+ auditMap.put("requestIndexes", indexList.stream().map(String::valueOf).collect(Collectors.joining(",")));
+ auditMap.put("transactionIndex", termIndex.getIndex() + "");
+ if (null != th) {
+ ozoneManager.getSystemAuditLogger().logWriteFailure(ozoneManager.buildAuditMessageForFailure(
+ OMSystemAction.DBPERSIST, auditMap, th));
+ } else {
+ ozoneManager.getSystemAuditLogger().logWriteSuccess(ozoneManager.buildAuditMessageForSuccess(
+ OMSystemAction.DBPERSIST, auditMap));
+ }
+ }
+
+ private void refreshCache(OzoneManager om, List tblUpdateList) {
+ // TODO no-cache, update bucket and volume cache as full table cache in no-cache
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
index f7c223eae09..0293d1f8805 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java
@@ -18,8 +18,10 @@
package org.apache.hadoop.ozone.om.request.upgrade;
import java.util.HashMap;
+import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.response.DummyOMClientResponse;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -35,6 +37,8 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import static org.apache.hadoop.ozone.OzoneConsts.PREPARE_MARKER_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.TRANSACTION_INFO_KEY;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.ratis.server.RaftServer;
@@ -67,6 +71,76 @@ public OMPrepareRequest(OMRequest omRequest) {
@Override
public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIndex termIndex) {
+ if (ozoneManager.isLeaderExecutorEnabled()) {
+ return validateAndUpdateCacheNew(ozoneManager, termIndex);
+ }
+ return validateAndUpdateCacheOld(ozoneManager, termIndex);
+ }
+ public OMClientResponse validateAndUpdateCacheNew(OzoneManager ozoneManager, TermIndex termIndex) {
+ final long transactionLogIndex = termIndex.getIndex();
+ LOG.info("OM {} Received prepare request with log {}", ozoneManager.getOMNodeId(), termIndex);
+
+ OMRequest omRequest = getOmRequest();
+ AuditLogger auditLogger = ozoneManager.getAuditLogger();
+ OzoneManagerProtocolProtos.UserInfo userInfo = omRequest.getUserInfo();
+ OMResponse.Builder responseBuilder = OmResponseUtil.getOMResponseBuilder(omRequest);
+ responseBuilder.setCmdType(Type.Prepare);
+ OMClientResponse response = null;
+ Exception exception = null;
+
+ try {
+ PrepareResponse omResponse = PrepareResponse.newBuilder().setTxnID(transactionLogIndex).build();
+ responseBuilder.setPrepareResponse(omResponse);
+ response = new DummyOMClientResponse(responseBuilder.build());
+
+ // update db and then take snapshot
+ Long index = null;
+ TransactionInfo transactionInfo = TransactionInfo.readTransactionInfo(ozoneManager.getMetadataManager());
+ if (null != transactionInfo) {
+ index = transactionInfo.getIndex();
+ }
+ ozoneManager.getMetadataManager().getTransactionInfoTable().put(
+ PREPARE_MARKER_KEY, TransactionInfo.valueOf(TransactionInfo.DEFAULT_VALUE.getTerm(), transactionLogIndex));
+ ozoneManager.getMetadataManager().getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
+ TransactionInfo.valueOf(termIndex, index));
+
+ OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
+ final RaftServer.Division division = omRatisServer.getServerDivision();
+ takeSnapshotAndPurgeLogs(transactionLogIndex, division);
+
+ // Save prepare index to a marker file, so if the OM restarts,
+ // it will remain in prepare mode as long as the file exists and its
+ // log indices are >= the one in the file.
+ ozoneManager.getPrepareState().finishPrepare(transactionLogIndex);
+
+ LOG.info("OM {} prepared at log index {}. Returning response {} with log index {}",
+ ozoneManager.getOMNodeId(), transactionLogIndex, omResponse, omResponse.getTxnID());
+ } catch (OMException e) {
+ exception = e;
+ LOG.error("Prepare Request Apply failed in {}. ", ozoneManager.getOMNodeId(), e);
+ response = new DummyOMClientResponse(createErrorOMResponse(responseBuilder, e));
+ } catch (IOException e) {
+ // Set error code so that prepare failure does not cause the OM to terminate.
+ exception = e;
+ LOG.error("Prepare Request Apply failed in {}. ", ozoneManager.getOMNodeId(), e);
+ response = new DummyOMClientResponse(createErrorOMResponse(responseBuilder,
+ new OMException(e, OMException.ResultCodes.PREPARE_FAILED)));
+
+ // Disable prepare gate and attempt to delete prepare marker file.
+ // Whether marker file delete fails or succeeds, we will return the
+ // above error response to the caller.
+ try {
+ ozoneManager.getPrepareState().cancelPrepare();
+ } catch (IOException ex) {
+ LOG.error("Failed to delete prepare marker file.", ex);
+ }
+ }
+
+ markForAudit(auditLogger, buildAuditMessage(OMAction.UPGRADE_PREPARE, new HashMap<>(), exception, userInfo));
+ return response;
+ }
+
+ public OMClientResponse validateAndUpdateCacheOld(OzoneManager ozoneManager, TermIndex termIndex) {
final long transactionLogIndex = termIndex.getIndex();
LOG.info("OM {} Received prepare request with log {}", ozoneManager.getOMNodeId(), termIndex);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 4506337e54d..723aaa7bc49 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -16,13 +16,17 @@
*/
package org.apache.hadoop.ozone.protocolPB;
+import static org.apache.hadoop.ipc.RpcConstants.DUMMY_CLIENT_ID;
+import static org.apache.hadoop.ipc.RpcConstants.INVALID_CALL_ID;
import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.LEADER_AND_READY;
import static org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer.RaftServerStatus.NOT_LEADER;
import static org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils.createClientRequest;
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.PrepareStatus;
import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
+import com.google.common.base.Preconditions;
import java.io.IOException;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,6 +37,7 @@
import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
@@ -57,6 +62,7 @@
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.ozone.security.S3SecurityUtil;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.ExitUtils;
@@ -251,7 +257,13 @@ public OMRequest getLastRequestToSubmit() {
*/
private OMResponse submitRequestToRatis(OMRequest request)
throws ServiceException {
- return omRatisServer.submitRequest(request);
+ if (!ozoneManager.isTestSecureOmFlag()) {
+ Preconditions.checkArgument(ProtobufRpcEngine.Server.getClientId() != DUMMY_CLIENT_ID);
+ Preconditions.checkArgument(ProtobufRpcEngine.Server.getCallId() != INVALID_CALL_ID);
+ }
+ return OzoneManagerRatisUtils.submitRequest(ozoneManager, request,
+ ClientId.valueOf(UUID.nameUUIDFromBytes(ProtobufRpcEngine.Server.getClientId())),
+ ProtobufRpcEngine.Server.getCallId());
}
private OMResponse submitReadRequestToOM(OMRequest request)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 576fac48c73..6b530b66025 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -426,6 +426,17 @@ public OMClientResponse handleWriteRequestImpl(OMRequest omRequest, TermIndex te
}
}
+ @Override
+ public OMClientResponse handleLeaderWriteRequest(OMClientRequest omClientRequest, TermIndex termIndex)
+ throws IOException {
+ injectPause();
+ OMClientResponse omClientResponse = captureLatencyNs(
+ impl.getPerfMetrics().getValidateAndUpdateCacheLatencyNs(),
+ () -> Objects.requireNonNull(omClientRequest.validateAndUpdateCache(getOzoneManager(), termIndex),
+ "omClientResponse returned by validateAndUpdateCache cannot be null"));
+ return omClientResponse;
+ }
+
@VisibleForTesting
public void setInjector(FaultInjector injector) {
this.injector = injector;
@@ -496,6 +507,10 @@ protected Status exceptionToResponseStatus(IOException ex) {
*/
@Override
public void validateRequest(OMRequest omRequest) throws OMException {
+ requestParamValidation(omRequest);
+ }
+
+ public static void requestParamValidation(OMRequest omRequest) throws OMException {
Type cmdType = omRequest.getCmdType();
if (cmdType == null) {
throw new OMException("CmdType is null",
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
index e60362a1ebb..82e76afc247 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -75,4 +76,13 @@ default OMClientResponse handleWriteRequest(OMRequest omRequest, TermIndex termI
* @return OMClientResponse
*/
OMClientResponse handleWriteRequestImpl(OMRequest omRequest, TermIndex termIndex) throws IOException;
+
+ /**
+ * Handle write request at leader execution.
+ *
+ * @param omClientRequest the write cleitn request
+ * @param termIndex - ratis transaction term and index
+ * @return OMClientResponse
+ */
+ OMClientResponse handleLeaderWriteRequest(OMClientRequest omClientRequest, TermIndex termIndex) throws IOException;
}