Skip to content

HDDS-11418. leader execution flow #7211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix test failure
  • Loading branch information
sumitagrawl committed Sep 18, 2024
commit 870b252fd01f9dc08b823844008b5a2e6ca168db
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ public void retrieveCache(Map<ByteBuffer, ByteBuffer> dataMap) {
}
}
isCommit = true;
clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,10 +37,18 @@ public class FollowerRequestExecutor {
private static final int RATIS_TASK_QUEUE_SIZE = 1000;
private final AtomicLong callId = new AtomicLong(0);
private final OzoneManager ozoneManager;
private AtomicLong uniqueIndex;
private final PoolExecutor<RequestContext> ratisSubmitter;
private final OzoneManagerRequestHandler handler;

public FollowerRequestExecutor(OzoneManager om) {
public FollowerRequestExecutor(OzoneManager om, AtomicLong uniqueIndex) {
this.ozoneManager = om;
this.uniqueIndex = uniqueIndex;
if (!om.isRatisEnabled()) {
this.handler = new OzoneManagerRequestHandler(ozoneManager);
} else {
this.handler = null;
}
ratisSubmitter = new PoolExecutor<>(RATIS_TASK_POOL_SIZE, RATIS_TASK_QUEUE_SIZE,
ozoneManager.getThreadNamePrefix(), this::ratisSubmitCommand, null);
}
Expand All @@ -59,14 +69,15 @@ private void ratisSubmitCommand(Collection<RequestContext> ctxs, PoolExecutor<Re

private void sendDbUpdateRequest(RequestContext ctx) {
try {
if (ozoneManager.isRatisEnabled()) {
throw new IOException("Non-ratis call is not supported");
if (!ozoneManager.isRatisEnabled()) {
OzoneManagerProtocolProtos.OMResponse response = OMBasicStateMachine.runCommand(ctx.getRequest(),
TermIndex.valueOf(-1, uniqueIndex.incrementAndGet()), handler, ozoneManager);
ctx.getFuture().complete(response);
return;
}
OzoneManagerProtocolProtos.OMResponse response = ozoneManager.getOmRatisServer().submitRequest(ctx.getRequest(),
ClientId.randomId(), callId.incrementAndGet());
ctx.getFuture().complete(response);
} catch (IOException ex) {
ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), ex));
} catch (Throwable th) {
ctx.getFuture().complete(createErrorResponse(ctx.getRequest(), new IOException(th)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ private void prepareAndSendRequest(

private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termIndex) throws Exception {
try {
if (ozoneManager.isRatisEnabled()) {
throw new IOException("Non-ratis call is not supported");
if (!ozoneManager.isRatisEnabled()) {
return OMBasicStateMachine.runCommand(nextRequest, termIndex, handler, ozoneManager);
}
OMResponse response = ozoneManager.getOmRatisServer().submitRequest(nextRequest, ClientId.randomId(),
termIndex.getIndex());
Expand All @@ -319,6 +319,7 @@ private OMResponse sendDbUpdateRequest(OMRequest nextRequest, TermIndex termInde
} catch (Exception ex) {
throw ex;
}
// on success, return null as db update response ignored
return null;
}
private OMResponse createErrorResponse(OMRequest omRequest, IOException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ public void close() {
* @param request OMRequest
* @return response from OM
*/
private static OMResponse runCommand(
public static OMResponse runCommand(
OMRequest request, TermIndex termIndex, RequestHandler handler, OzoneManager om) {
OMClientResponse omClientResponse = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class OMGateway {
public OMGateway(OzoneManager om) throws IOException {
this.om = om;
this.leaderExecutor = new LeaderRequestExecutor(om, uniqueIndex);
this.followerExecutor = new FollowerRequestExecutor(om);
this.followerExecutor = new FollowerRequestExecutor(om, uniqueIndex);
if (om.isLeaderExecutorEnabled() && om.isRatisEnabled()) {
OzoneManagerRatisServer ratisServer = om.getOmRatisServer();
ratisServer.getOmBasicStateMachine().registerLeaderNotifier(this::leaderChangeNotifier);
Expand Down Expand Up @@ -156,11 +156,12 @@ private void rebuildBucketVolumeCache() throws IOException {
while (cacheItr.hasNext()) {
cachedBucketKeySet.add(cacheItr.next().getKey().getCacheKey());
}
TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>> bucItr = bucketTable.iterator();
while (bucItr.hasNext()) {
Table.KeyValue<String, OmBucketInfo> next = bucItr.next();
bucketTable.addCacheEntry(next.getKey(), next.getValue(), -1);
cachedBucketKeySet.remove(next.getKey());
try (TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>> bucItr = bucketTable.iterator()) {
while (bucItr.hasNext()) {
Table.KeyValue<String, OmBucketInfo> next = bucItr.next();
bucketTable.addCacheEntry(next.getKey(), next.getValue(), -1);
cachedBucketKeySet.remove(next.getKey());
}
}

// removing extra cache entry
Expand All @@ -174,11 +175,12 @@ private void rebuildBucketVolumeCache() throws IOException {
while (volCacheItr.hasNext()) {
cachedVolumeKeySet.add(volCacheItr.next().getKey().getCacheKey());
}
TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>> volItr = volumeTable.iterator();
while (volItr.hasNext()) {
Table.KeyValue<String, OmVolumeArgs> next = volItr.next();
volumeTable.addCacheEntry(next.getKey(), next.getValue(), -1);
cachedVolumeKeySet.remove(next.getKey());
try (TableIterator<String, ? extends Table.KeyValue<String, OmVolumeArgs>> volItr = volumeTable.iterator()) {
while (volItr.hasNext()) {
Table.KeyValue<String, OmVolumeArgs> next = volItr.next();
volumeTable.addCacheEntry(next.getKey(), next.getValue(), -1);
cachedVolumeKeySet.remove(next.getKey());
}
}

// removing extra cache entry
Expand Down