Skip to content

HDDS-12161. Remove code paths for non-Ratis OM in request/response #7845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ public static File createOMDir(String dirPath) {
* @return {@link RepeatedOmKeyInfo}
*/
public static RepeatedOmKeyInfo prepareKeyForDelete(OmKeyInfo keyInfo,
long trxnLogIndex, boolean isRatisEnabled) {
long trxnLogIndex) {
// If this key is in a GDPR enforced bucket, then before moving
// KeyInfo to deletedTable, remove the GDPR related metadata and
// FileEncryptionInfo from KeyInfo.
Expand All @@ -525,7 +525,7 @@ public static RepeatedOmKeyInfo prepareKeyForDelete(OmKeyInfo keyInfo,
}

// Set the updateID
keyInfo.setUpdateID(trxnLogIndex, isRatisEnabled);
keyInfo.setUpdateID(trxnLogIndex);

//The key doesn't exist in deletedTable, so create a new instance.
return new RepeatedOmKeyInfo(keyInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,14 @@ private OmMultipartKeyInfo(Builder b) {

/** Copy constructor. */
private OmMultipartKeyInfo(OmMultipartKeyInfo b) {
super(b);
this.uploadID = b.uploadID;
this.creationTime = b.creationTime;
this.replicationConfig = b.replicationConfig;
// PartKeyInfoMap is an immutable data structure. Whenever a PartKeyInfo
// is added, it returns a new shallow copy of the PartKeyInfoMap Object
// so here we can directly pass in partKeyInfoMap
this.partKeyInfoMap = b.partKeyInfoMap;
setObjectID(b.getObjectID());
setUpdateID(b.getUpdateID());
this.parentID = b.parentID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ protected WithMetadata(Builder b) {
metadata = b.metadata;
}

protected WithMetadata(WithMetadata other) {
metadata = new ConcurrentHashMap<>(other.getMetadata());
}

/**
* Custom key value metadata.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ protected WithObjectID(Builder b) {
updateID = b.updateID;
}

protected WithObjectID(WithObjectID other) {
super(other);
objectID = other.objectID;
updateID = other.updateID;
}

/**
* ObjectIDs are unique and immutable identifier for each object in the
* System.
Expand Down Expand Up @@ -73,10 +79,8 @@ public final void setObjectID(long obId) {
/**
* Sets the update ID. For each modification of this object, we will set
* this to a value greater than the current value.
* @param updateId long
* @param isRatisEnabled boolean
*/
public final void setUpdateID(long updateId, boolean isRatisEnabled) {
public final void setUpdateID(long newValue) {

// Because in non-HA, we have multiple rpc handler threads and
// transactionID is generated in OzoneManagerServerSideTranslatorPB.
Expand All @@ -103,25 +107,22 @@ public final void setUpdateID(long updateId, boolean isRatisEnabled) {
// Main reason, in non-HA transaction Index after restart starts from 0.
// And also because of this same reason we don't do replay checks in non-HA.

if (isRatisEnabled && updateId < this.getUpdateID()) {
final long currentValue = getUpdateID();
if (newValue < currentValue) {
throw new IllegalArgumentException(String.format(
"Trying to set updateID to %d which is not greater than the " +
"current value of %d for %s", updateId, this.getUpdateID(),
"current value of %d for %s", newValue, currentValue,
getObjectInfo()));
}

this.setUpdateID(updateId);
updateID = newValue;
}

/** Hook method, customized in subclasses. */
public String getObjectInfo() {
return this.toString();
}

public final void setUpdateID(long updateID) {
this.updateID = updateID;
}

/** Builder for {@link WithObjectID}. */
public static class Builder extends WithMetadata.Builder {
private long objectID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.WRITE;
import static org.apache.ozone.test.GenericTestUtils.getTestStartTime;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -4934,8 +4933,6 @@ public void testUploadWithStreamAndMemoryMappedBuffer(@TempDir Path dir) throws
@Test
public void testParallelDeleteBucketAndCreateKey() throws IOException,
InterruptedException, TimeoutException {
assumeThat(getCluster().getOzoneManager().isRatisEnabled()).isTrue();

String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ public void setUp() throws IOException {
omMetadataManager = new OmMetadataManagerImpl(conf, ozoneManager);
when(ozoneManager.getMetrics()).thenReturn(omMetrics);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
when(ozoneManager.isRatisEnabled()).thenReturn(true);
auditLogger = mock(AuditLogger.class);
when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4233,14 +4233,6 @@ public void checkLeaderStatus() throws OMNotLeaderException,
}
}

/**
* Return if Ratis is enabled or not.
*/
// TODO remove in HDDS-12161
public boolean isRatisEnabled() {
return true;
}

/**
* @return true if Ozone filesystem snapshot is enabled, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ OMClientResponse getResponse() {
public static final class Builder {
private OMMetadataManager omMetadataManager;
private Consumer<TermIndex> updateLastAppliedIndex = termIndex -> { };
private boolean isRatisEnabled = false;
private boolean isTracingEnabled = false;
private int maxUnFlushedTransactionCount = 0;
private FlushNotifier flushNotifier;
Expand All @@ -118,11 +117,6 @@ Builder setUpdateLastAppliedIndex(Consumer<TermIndex> updateLastAppliedIndex) {
return this;
}

public Builder enableRatis(boolean enableRatis) {
this.isRatisEnabled = enableRatis;
return this;
}

public Builder enableTracing(boolean enableTracing) {
this.isTracingEnabled = enableTracing;
return this;
Expand All @@ -149,9 +143,8 @@ public Builder setS3SecretManager(S3SecretManager s3SecretManager) {
}

public OzoneManagerDoubleBuffer build() {
Preconditions.assertTrue(isRatisEnabled == maxUnFlushedTransactionCount > 0L,
() -> "Ratis is " + (isRatisEnabled ? "enabled" : "disabled")
+ " but maxUnFlushedTransactionCount = " + maxUnFlushedTransactionCount);
Preconditions.assertTrue(maxUnFlushedTransactionCount > 0L,
() -> "maxUnFlushedTransactionCount = " + maxUnFlushedTransactionCount);
if (flushNotifier == null) {
flushNotifier = new FlushNotifier();
}
Expand All @@ -172,7 +165,6 @@ static Semaphore newSemaphore(int permits) {
private Queue<Entry> readyBuffer;
/**
* Limit the number of un-flushed transactions for {@link OzoneManagerStateMachine}.
* It is set to null if ratis is disabled; see {@link #isRatisEnabled()}.
*/
private final Semaphore unFlushedTransactions;

Expand Down Expand Up @@ -221,16 +213,11 @@ public OzoneManagerDoubleBuffer start() {
return this;
}

private boolean isRatisEnabled() {
return unFlushedTransactions != null;
}

/**
* Acquires the given number of permits from unFlushedTransactions,
* blocking until all are available, or the thread is interrupted.
*/
public void acquireUnFlushedTransactions(int n) throws InterruptedException {
Preconditions.assertTrue(isRatisEnabled(), "Ratis is not enabled");
unFlushedTransactions.acquire(n);
}

Expand Down Expand Up @@ -369,15 +356,6 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
metrics.updateFlushTime(Time.monotonicNow() - startTime);
}

// Complete futures first and then do other things.
// So that handler threads will be released.
if (!isRatisEnabled()) {
buffer.stream()
.map(Entry::getResponse)
.map(OMClientResponse::getFlushFuture)
.forEach(f -> f.complete(null));
}

final long accumulativeCount = flushedTransactionCount.addAndGet(flushedTransactionsSize);
final long flushedIterations = flushIterations.incrementAndGet();
LOG.debug("Sync iteration: {}, size in this iteration: {}, accumulative count: {}",
Expand All @@ -386,9 +364,7 @@ private void flushBatch(Queue<Entry> buffer) throws IOException {
// Clean up committed transactions.
cleanupCache(cleanupEpochs);

if (isRatisEnabled()) {
releaseUnFlushedTransactions(flushedTransactionsSize);
}
releaseUnFlushedTransactions(flushedTransactionsSize);
// update the last updated index in OzoneManagerStateMachine.
updateLastAppliedIndex.accept(lastTransaction);

Expand Down Expand Up @@ -555,10 +531,6 @@ private void terminate(Throwable t, int status, OMResponse omResponse) {
public synchronized void add(OMClientResponse response, TermIndex termIndex) {
currentBuffer.add(new Entry(termIndex, response));
notify();

if (!isRatisEnabled()) {
response.setFlushFuture(new CompletableFuture<>());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ public OzoneManagerDoubleBuffer buildDoubleBufferForRatis() {
.setMaxUnFlushedTransactionCount(maxUnFlushedTransactionCount)
.setThreadPrefix(threadPrefix)
.setS3SecretManager(ozoneManager.getS3SecretManager())
.enableRatis(true)
.enableTracing(isTracingEnabled)
.build()
.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// Add objectID and updateID
omBucketInfo.setObjectID(
ozoneManager.getObjectIdFromTxId(transactionLogIndex));
omBucketInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
omBucketInfo.setUpdateID(transactionLogIndex);

addDefaultAcls(omBucketInfo, omVolumeArgs, ozoneManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
omBucketInfo.setModificationTime(
setBucketPropertyRequest.getModificationTime());
// Set the updateID to current transaction log index
omBucketInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
omBucketInfo.setUpdateID(transactionLogIndex);

// Update table cache.
omMetadataManager.getBucketTable().addCacheEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
}

operationResult = omBucketAclOp.test(ozoneAcls, omBucketInfo);
omBucketInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
omBucketInfo.setUpdateID(transactionLogIndex);

if (operationResult) {
// Update the modification time when updating ACLs of Bucket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
ozoneManager.getPrefixManager(), omBucketInfo, pathInfo, trxnLogIndex,
ozoneManager.getObjectIdFromTxId(trxnLogIndex),
ozoneManager.isRatisEnabled(), repConfig, ozoneManager.getConfiguration());
repConfig, ozoneManager.getConfiguration());
validateEncryptionKeyInfo(omBucketInfo, keyArgs);

long openVersion = omKeyInfo.getLatestVersionLocations().getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
getFileEncryptionInfo(keyArgs), ozoneManager.getPrefixManager(),
bucketInfo, pathInfoFSO, trxnLogIndex,
pathInfoFSO.getLeafNodeObjectId(),
ozoneManager.isRatisEnabled(), repConfig, ozoneManager.getConfiguration());
repConfig, ozoneManager.getConfiguration());
validateEncryptionKeyInfo(bucketInfo, keyArgs);

long openVersion = omFileInfo.getLatestVersionLocations().getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
KEY_UNDER_LEASE_SOFT_LIMIT_PERIOD);
}
openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
openKeyInfo.setUpdateID(transactionLogIndex, ozoneManager.isRatisEnabled());
openKeyInfo.setUpdateID(transactionLogIndex);
openKeyInfo.setModificationTime(Time.now());
// add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
openKeyInfo.setModificationTime(keyArgs.getModificationTime());

// Set the UpdateID to current transactionLogIndex
openKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
openKeyInfo.setUpdateID(trxnLogIndex);

// Add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
openKeyInfo.setModificationTime(keyArgs.getModificationTime());

// Set the UpdateID to current transactionLogIndex
openKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
openKeyInfo.setUpdateID(trxnLogIndex);

// Add to cache.
addOpenTableCacheEntry(trxnLogIndex, omMetadataManager, openKeyName, keyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
}

return new OMDirectoriesPurgeResponseWithFSO(
omResponse.build(), purgeRequests, ozoneManager.isRatisEnabled(),
getBucketLayout(), volBucketInfoMap, fromSnapshotInfo, openKeyInfoMap);
omResponse.build(), purgeRequests,
getBucketLayout(), volBucketInfoMap, fromSnapshotInfo, openKeyInfoMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
openKeyToDelete = omMetadataManager.getOpenKeyTable(getBucketLayout()).get(dbOpenKeyToDeleteKey);
openKeyToDelete.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY, "true");
openKeyToDelete.setModificationTime(Time.now());
openKeyToDelete.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
openKeyToDelete.setUpdateID(trxnLogIndex);
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKeyToDeleteKey, openKeyToDelete, trxnLogIndex);
}
Expand Down Expand Up @@ -310,7 +310,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
omKeyInfo.updateLocationInfoList(locationInfoList, false);

// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
omKeyInfo.setUpdateID(trxnLogIndex);

Map<String, RepeatedOmKeyInfo> oldKeyVersionsToDeleteMap = null;
long correctedSpace = omKeyInfo.getReplicatedSize();
Expand All @@ -324,7 +324,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// Subtract the size of blocks to be overwritten.
correctedSpace -= keyToDelete.getReplicatedSize();
RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
keyToDelete, trxnLogIndex);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
// using pseudoObjId as objectId can be same in case of overwrite key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
omMetadataManager, dbOpenKeyToDeleteKey, keyName);
openKeyToDelete.getMetadata().put(OzoneConsts.OVERWRITTEN_HSYNC_KEY, "true");
openKeyToDelete.setModificationTime(Time.now());
openKeyToDelete.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
openKeyToDelete.setUpdateID(trxnLogIndex);
OMFileRequest.addOpenFileTableCacheEntry(omMetadataManager,
dbOpenKeyToDeleteKey, openKeyToDelete, keyName, fileName, trxnLogIndex);
}
Expand All @@ -230,7 +230,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
omKeyInfo.updateLocationInfoList(locationInfoList, false);

// Set the UpdateID to current transactionLogIndex
omKeyInfo.setUpdateID(trxnLogIndex, ozoneManager.isRatisEnabled());
omKeyInfo.setUpdateID(trxnLogIndex);

// If bucket versioning is turned on during the update, between key
// creation and key commit, old versions will be just overwritten and
Expand All @@ -253,7 +253,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
// Subtract the size of blocks to be overwritten.
correctedSpace -= keyToDelete.getReplicatedSize();
RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp(
keyToDelete, trxnLogIndex, ozoneManager.isRatisEnabled());
keyToDelete, trxnLogIndex);
checkBucketQuotaInBytes(omMetadataManager, omBucketInfo,
correctedSpace);
String delKeyName = omMetadataManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
keyArgs.getDataSize(), locations, getFileEncryptionInfo(keyArgs),
ozoneManager.getPrefixManager(), bucketInfo, pathInfo, trxnLogIndex,
ozoneManager.getObjectIdFromTxId(trxnLogIndex),
ozoneManager.isRatisEnabled(), replicationConfig, ozoneManager.getConfiguration());
replicationConfig, ozoneManager.getConfiguration());

validateEncryptionKeyInfo(bucketInfo, keyArgs);

Expand Down
Loading