Skip to content

HDDS-12356. granular locking framework for obs #8217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix review comments
  • Loading branch information
sumitagrawl committed Apr 4, 2025
commit 247d0c2d8225f702fed1f83bb5ece29c6f160b99
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ public Optional<Set<LockInfo>> getKeyLocks() {
return Optional.ofNullable(keyLocks);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (volumeLock != null) {
sb.append("Volume:").append(volumeLock);
}
if (bucketLock != null) {
sb.append("Bucket:").append(bucketLock);
}
if (keyLocks != null) {
sb.append("Keys:").append(keyLocks);
}
return sb.toString();
}

/**
* Builds an {@link OmLockInfo} object with optional volume, bucket or key locks.
*/
Expand Down Expand Up @@ -148,5 +163,10 @@ public int hashCode() {
public int compareTo(LockInfo other) {
return Integer.compare(hashCode(), other.hashCode());
}

@Override
public String toString() {
return "LockInfo{" + "name=" + name + ", isWriteLock=" + isWriteLock + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -53,16 +53,16 @@ public OmRequestGatekeeper() {

public OmLockObject lock(OmLockInfo lockInfo) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return OmLockObject and require caller to pass it back to unlock instead of using Autocloseable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defined OmLockObject as autoclosable, so based on usecase, it can be used with try-with-resouce.

We need return this object as it holds LockStats which is required to be set to HadoopRPC metrics with each request calling this interface.
This might be added to ozone metrics also to capture lock stats.

OmLockObject omLockObject = new OmLockObject(lockInfo);
List<Lock> locks = omLockObject.getLocks();
long startTime = Time.monotonicNowNanos();
Optional<OmLockInfo.LockInfo> optionalVolumeLock = lockInfo.getVolumeLock();
Optional<OmLockInfo.LockInfo> optionalBucketLock = lockInfo.getBucketLock();
Optional<Set<OmLockInfo.LockInfo>> optionalKeyLocks = lockInfo.getKeyLocks();
List<Lock> locks = new ArrayList<>();

if (optionalVolumeLock.isPresent()) {
OmLockInfo.LockInfo volumeLockInfo = optionalVolumeLock.get();
if (volumeLockInfo.isWriteLock()) {
omLockObject.setLockStatType(OmLockStats.Type.WRITE);
omLockObject.setReadStatsType(false);
locks.add(volumeLocks.get(volumeLockInfo.getName()).writeLock());
} else {
locks.add(volumeLocks.get(volumeLockInfo.getName()).readLock());
Expand All @@ -72,7 +72,7 @@ public OmLockObject lock(OmLockInfo lockInfo) throws IOException {
if (optionalBucketLock.isPresent()) {
OmLockInfo.LockInfo bucketLockInfo = optionalBucketLock.get();
if (bucketLockInfo.isWriteLock()) {
omLockObject.setLockStatType(OmLockStats.Type.WRITE);
omLockObject.setReadStatsType(false);
locks.add(bucketLocks.get(bucketLockInfo.getName()).writeLock());
} else {
locks.add(bucketLocks.get(bucketLockInfo.getName()).readLock());
Expand All @@ -81,35 +81,24 @@ public OmLockObject lock(OmLockInfo lockInfo) throws IOException {

if (optionalKeyLocks.isPresent()) {
for (ReadWriteLock keyLock: keyLocks.bulkGet(optionalKeyLocks.get())) {
omLockObject.setLockStatType(OmLockStats.Type.WRITE);
omLockObject.setReadStatsType(false);
locks.add(keyLock.writeLock());
}
}

try {
acquireLocks(locks);
acquireLocks(locks, omLockObject.getLocks());
lockStatsBegin(omLockObject.getLockStats(), Time.monotonicNowNanos(), startTime);
} catch (InterruptedException e) {
locks.clear();
Thread.currentThread().interrupt();
throw new OMException("waiting for locks is interrupted", OMException.ResultCodes.INTERNAL_ERROR);
throw new OMException("Waiting for locks is interrupted, " + lockInfo, OMException.ResultCodes.INTERNAL_ERROR);
} catch (TimeoutException e) {
locks.clear();
throw new OMException("Unable to get locks, timeout occurred", OMException.ResultCodes.TIMEOUT);
throw new OMException("Timeout occurred for locks " + lockInfo, OMException.ResultCodes.TIMEOUT);
}
return omLockObject;
}

/*
Optional: If we want more diagnostic info on the type of lock that failed to be acquired (volume, bucket, or key),
We can make the parameter a list of objects that wrap the Lock with information about its type.
Note that logging the specific volume, bucket or keys this lock was trying to acquire is not helpful and
misleading because collisions within the stripe lock might mean we are blocked on a request for a completely
different part of the namespace.
Obtaining the thread ID that we were waiting on would be more useful, but there is no easy way to do that.
*/
private void acquireLocks(List<Lock> locks) throws TimeoutException, InterruptedException {
List<Lock> acquiredLocks = new ArrayList<>(locks.size());
private void acquireLocks(List<Lock> locks, Stack<Lock> acquiredLocks) throws TimeoutException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrently, if the lock.tryLock(..) is interrupted, the partial lock won't be released. Getting everything right with timeout is not easy.

If we are adding timeout support, we have to update the design doc first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added section for having timeout in design doc for lock

for (Lock lock: locks) {
if (lock.tryLock(LOCK_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) {
try {
Expand All @@ -127,63 +116,67 @@ private void acquireLocks(List<Lock> locks) throws TimeoutException, Interrupted
}
}

public void unlock(OmLockObject lockObject) {
releaseLocks(lockObject.getLocks());
lockStatsEnd(lockObject.getLockStats(), lockObject.getLockStatType());
lockObject.getLocks().clear();
}

private void releaseLocks(List<Lock> locks) {
ListIterator<Lock> reverseIterator = locks.listIterator(locks.size());
while (reverseIterator.hasPrevious()) {
Lock lock = reverseIterator.previous();
lock.unlock();
private void releaseLocks(Stack<Lock> locks) {
while (!locks.empty()) {
locks.pop().unlock();
}
}

private static void lockStatsBegin(OmLockStats lockStats, long endTime, long startTime) {
lockStats.add(endTime - startTime, OmLockStats.Type.WAIT);
lockStats.addWaitLockNanos(endTime - startTime);
lockStats.setLockStartTime(endTime);
}

private static void lockStatsEnd(OmLockStats lockStats, OmLockStats.Type type) {
private static void lockStatsEnd(OmLockStats lockStats, boolean readStatsType) {
if (lockStats.getLockStartTime() > 0) {
lockStats.add(Time.monotonicNowNanos() - lockStats.getLockStartTime(), type);
if (readStatsType) {
lockStats.addReadLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime());
} else {
lockStats.addWriteLockNanos(Time.monotonicNowNanos() - lockStats.getLockStartTime());
}
}
}

/**
* Lock information after taking locks.
* Lock information after taking locks, and to be used to release locks.
*/
public static class OmLockObject {
public static class OmLockObject implements AutoCloseable {
private final OmLockInfo omLockInfo;
private final List<Lock> locks = new ArrayList<>();
private final Stack<Lock> locks = new Stack<>();
private final OmLockStats lockStats = new OmLockStats();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these fields intended to be static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as this is referred for each request, and can not be static.

private OmLockStats.Type lockStatType = OmLockStats.Type.READ;
private boolean readStatsType = true;

public OmLockObject(OmLockInfo lockInfoProvider) {
this.omLockInfo = lockInfoProvider;
}

public List<Lock> getLocks() {
public Stack<Lock> getLocks() {
return locks;
}

public OmLockStats getLockStats() {
return lockStats;
}

public OmLockStats.Type getLockStatType() {
return lockStatType;
public void setReadStatsType(boolean readStatsType) {
this.readStatsType = readStatsType;
}

public void setLockStatType(OmLockStats.Type lockStatType) {
this.lockStatType = lockStatType;
public boolean getReadStatsType() {
return readStatsType;
}

public OmLockInfo getOmLockInfo() {
return omLockInfo;
}

@Override
public void close() throws IOException {
while (!locks.empty()) {
locks.pop().unlock();
}
lockStatsEnd(lockStats, readStatsType);
}
}

/**
Expand Down Expand Up @@ -215,28 +208,16 @@ public long getWriteLockNanos() {
return writeLockNanos;
}

void add(long timeNanos, Type type) {
switch (type) {
case WAIT:
waitLockNanos += timeNanos;
break;
case READ:
readLockNanos += timeNanos;
break;
case WRITE:
writeLockNanos += timeNanos;
break;
default:
}
void addWaitLockNanos(long timeNanos) {
waitLockNanos += timeNanos;
}

void addReadLockNanos(long timeNanos) {
readLockNanos += timeNanos;
}

/**
* lock time stat type.
*/
public enum Type {
WAIT,
READ,
WRITE
void addWriteLockNanos(long timeNanos) {
writeLockNanos += timeNanos;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ public void testObsLockOprWithParallelLock() throws IOException, ExecutionExcept
OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo);
assertEquals(2, lockObject.getLocks().size());

CompletableFuture<OmRequestGatekeeper.OmLockObject> rst = CompletableFuture.supplyAsync(() -> {
try {
OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(lockInfo);
omLockOpr.unlock(lockInfoAgain);
return lockInfoAgain;
CompletableFuture<Boolean> rst = CompletableFuture.supplyAsync(() -> {
try (OmRequestGatekeeper.OmLockObject ignored = omLockOpr.lock(lockInfo)) {
return true;
} catch (IOException e) {
fail("should not throw exception");
}
return null;
return false;
});

// parallel lock wait should fail as previous lock not released
Expand All @@ -62,7 +60,7 @@ public void testObsLockOprWithParallelLock() throws IOException, ExecutionExcept
}

// after unlock, the thread should be able to get lock
omLockOpr.unlock(lockObject);
lockObject.close();
rst.get();
}

Expand All @@ -73,24 +71,22 @@ public void testObsLockOprListKeyRepeated() throws IOException {
.addBucketReadLock("vol", "bucket")
.addKeyWriteLock("vol", "bucket", "testkey")
.addKeyWriteLock("vol", "bucket", "testkey2").build();
OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo);
assertEquals(3, lockObject.getLocks().size());

omLockOpr.unlock(lockObject);
try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) {
assertEquals(3, lockObject.getLocks().size());
}

lockObject = omLockOpr.lock(lockInfo);
assertEquals(3, lockObject.getLocks().size());
omLockOpr.unlock(lockObject);
try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) {
assertEquals(3, lockObject.getLocks().size());
}
}

@Test
public void testBucketReadLock() throws IOException {
OmRequestGatekeeper omLockOpr = new OmRequestGatekeeper();
OmLockInfo lockInfo = new OmLockInfo.Builder().addBucketReadLock("vol", "bucket").build();
OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo);
assertEquals(1, lockObject.getLocks().size());

omLockOpr.unlock(lockObject);
try (OmRequestGatekeeper.OmLockObject lockObject = omLockOpr.lock(lockInfo)) {
assertEquals(1, lockObject.getLocks().size());
}
}

@Test
Expand All @@ -102,15 +98,13 @@ public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionE

OmLockInfo writeLockInfo = new OmLockInfo.Builder().addBucketWriteLock("vol", "bucket").build();

CompletableFuture<OmRequestGatekeeper.OmLockObject> rst = CompletableFuture.supplyAsync(() -> {
try {
OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo);
omLockOpr.unlock(lockInfoAgain);
return lockInfoAgain;
CompletableFuture<Boolean> rst = CompletableFuture.supplyAsync(() -> {
try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) {
return true;
} catch (IOException e) {
fail("should not throw exception");
}
return null;
return false;
});

// parallel lock wait should fail as previous lock not released
Expand All @@ -122,7 +116,7 @@ public void testBucketReadWithWriteParallelLock() throws IOException, ExecutionE
}

// after unlock, the thread should be able to get lock
omLockOpr.unlock(lockObject);
lockObject.close();
rst.get();
}

Expand All @@ -135,15 +129,13 @@ public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionE
assertEquals(1, lockObject.getLocks().size());

OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeWriteLock("vol").build();
CompletableFuture<OmRequestGatekeeper.OmLockObject> rst = CompletableFuture.supplyAsync(() -> {
try {
OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo);
omLockOpr.unlock(lockInfoAgain);
return lockInfoAgain;
CompletableFuture<Boolean> rst = CompletableFuture.supplyAsync(() -> {
try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) {
return true;
} catch (IOException e) {
fail("should not throw exception");
}
return null;
return false;
});

// parallel lock wait should fail as previous lock not released
Expand All @@ -155,7 +147,7 @@ public void testVolumeReadWithWriteParallelLock() throws IOException, ExecutionE
}

// after unlock, the thread should be able to get lock
omLockOpr.unlock(lockObject);
lockObject.close();
rst.get();
}

Expand All @@ -170,15 +162,13 @@ public void testVolWriteWithVolBucketRWParallelLock() throws IOException, Execut
OmLockInfo writeLockInfo = new OmLockInfo.Builder().addVolumeReadLock("vol")
.addBucketWriteLock("vol", "buck1").build();

CompletableFuture<OmRequestGatekeeper.OmLockObject> rst = CompletableFuture.supplyAsync(() -> {
try {
OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo);
omLockOpr.unlock(lockInfoAgain);
return lockInfoAgain;
CompletableFuture<Boolean> rst = CompletableFuture.supplyAsync(() -> {
try (OmRequestGatekeeper.OmLockObject lockInfoAgain = omLockOpr.lock(writeLockInfo)) {
return true;
} catch (IOException e) {
fail("should not throw exception");
}
return null;
return false;
});

// parallel lock wait should fail as previous lock not released
Expand All @@ -190,7 +180,7 @@ public void testVolWriteWithVolBucketRWParallelLock() throws IOException, Execut
}

// after unlock, the thread should be able to get lock
omLockOpr.unlock(lockObject);
lockObject.close();
rst.get();
}
}