Skip to content

HDDS-11418. leader executor framework [prototype] #7406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
*/
void loadFromFile(File externalFile) throws IOException;

default Table<byte[], byte[]> getRawTable() {
throw new NotImplementedException("getRawTable is not implemented");
}

/**
* Class used to represent the key and value pair of a db entry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,15 @@ private CodecBuffer encodeKeyCodecBuffer(KEY key) throws IOException {
return key == null ? null : keyCodec.toDirectCodecBuffer(key);
}

public CodecBuffer encodeValueCodecBuffer(VALUE value) throws IOException {
return value == null ? null : valueCodec.toDirectCodecBuffer(value);
}

private byte[] encodeKey(KEY key) throws IOException {
return key == null ? null : keyCodec.toPersistedFormat(key);
}

private byte[] encodeValue(VALUE value) throws IOException {
public byte[] encodeValue(VALUE value) throws IOException {
return value == null ? null : valueCodec.toPersistedFormat(value);
}

Expand Down Expand Up @@ -543,6 +547,11 @@ public void loadFromFile(File externalFile) throws IOException {
rawTable.loadFromFile(externalFile);
}

@Override
public Table<byte[], byte[]> getRawTable() {
return rawTable;
}

@Override
public void cleanupCache(List<Long> epochs) {
cache.cleanup(epochs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static boolean isReadOnly(
case AbortExpiredMultiPartUploads:
case SetSnapshotProperty:
case QuotaRepair:
case PersistDb:
case UnknownCommand:
return false;
case EchoRPC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum Type {
GetQuotaRepairStatus = 135;
StartQuotaRepair = 136;
SnapshotMoveTableKeys = 137;
PersistDb = 138;
}

enum SafeMode {
Expand Down Expand Up @@ -295,6 +296,8 @@ message OMRequest {
optional GetQuotaRepairStatusRequest GetQuotaRepairStatusRequest = 133;
optional StartQuotaRepairRequest StartQuotaRepairRequest = 134;
optional SnapshotMoveTableKeysRequest SnapshotMoveTableKeysRequest = 135;
optional PersistDbRequest PersistDbRequest = 136;
optional ExecutionControlRequest ExecutionControlRequest = 137;
}

message OMResponse {
Expand Down Expand Up @@ -424,6 +427,7 @@ message OMResponse {
optional QuotaRepairResponse QuotaRepairResponse = 134;
optional GetQuotaRepairStatusResponse GetQuotaRepairStatusResponse = 136;
optional StartQuotaRepairResponse StartQuotaRepairResponse = 137;
optional PersistDbResponse PersistDbResponse = 138;
}

enum Status {
Expand Down Expand Up @@ -2232,6 +2236,7 @@ message BucketQuotaCount {
required int64 diffUsedBytes = 3;
required int64 diffUsedNamespace = 4;
required bool supportOldQuota = 5 [default=false];
optional uint64 bucketObjectId = 6;
}

message QuotaRepairResponse {
Expand Down Expand Up @@ -2259,6 +2264,29 @@ message OMLockDetailsProto {
optional uint64 writeLockNanos = 4;
}

message PersistDbRequest {
repeated DBTableUpdate tableUpdates = 1;
repeated BucketQuotaCount bucketQuotaCount = 3;
}
message DBTableUpdate {
required string tableName = 1;
repeated DBTableRecord records = 2;
}
message DBTableRecord {
required bytes key = 1;
optional bytes value = 2;
}
message PersistDbResponse {
}
message ExecutionControlRequest {
repeated ClientRequestInfo requestInfo = 1;
}
message ClientRequestInfo {
optional string uuidClientId = 1;
optional uint64 callId = 2;
optional OMResponse response = 3;
optional int64 index = 4;
}
/**
The OM service that takes care of Ozone namespace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
* as present for request.
*/
public enum OMSystemAction implements AuditAction {
STARTUP;
STARTUP,
DBPERSIST;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.ratis.execution.OMGateway;
import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
Expand Down Expand Up @@ -475,6 +476,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl

private boolean fsSnapshotEnabled;

private final OMGateway omGateway;

/**
* OM Startup mode.
*/
Expand Down Expand Up @@ -723,6 +726,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
if (isOmGrpcServerEnabled) {
omS3gGrpcServer = getOmS3gGrpcServer(configuration);
}

// init om gateway for request
omGateway = new OMGateway(this);

ShutdownHookManager.get().addShutdownHook(this::saveOmMetrics,
SHUTDOWN_HOOK_PRIORITY);

Expand Down Expand Up @@ -1749,6 +1756,8 @@ public void start() throws IOException {
bootstrap(omNodeDetails);
}

omGateway.start();

omState = State.RUNNING;
auditMap.put("NewOmState", omState.name());
SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap));
Expand Down Expand Up @@ -1826,6 +1835,7 @@ public void restart() throws IOException {
}
startJVMPauseMonitor();
setStartTime();
omGateway.start();
omState = State.RUNNING;
auditMap.put("NewOmState", omState.name());
SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap));
Expand Down Expand Up @@ -2266,6 +2276,7 @@ public boolean stop() {
}
try {
omState = State.STOPPED;
omGateway.stop();
// Cancel the metrics timer and set to null.
if (metricsTimer != null) {
metricsTimer.cancel();
Expand Down Expand Up @@ -5032,4 +5043,8 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION);
}
}

public OMGateway getOmGateway() {
return omGateway;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.execution.request.ExecutionContext;
import org.apache.hadoop.ozone.om.ratis.execution.request.OMRequestBase;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -124,6 +126,17 @@ public static void log(OMAuditLogger.Builder builder, TermIndex termIndex) {
}
}

public static void log(OMAuditLogger.Builder builder, ExecutionContext executionContext) {
if (builder.isLog.get()) {
if (null == builder.getAuditMap()) {
builder.setAuditMap(new HashMap<>());
}
builder.getAuditMap().put("Transaction", "" + executionContext.getIndex());
builder.getMessageBuilder().withParams(builder.getAuditMap());
builder.getAuditLogger().logWrite(builder.getMessageBuilder().build());
}
}

public static void log(OMAuditLogger.Builder builder) {
if (builder.isLog.get()) {
builder.getMessageBuilder().withParams(builder.getAuditMap());
Expand Down Expand Up @@ -159,6 +172,34 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O
}
}

public static void log(OMAuditLogger.Builder builder, OMRequestBase request, OzoneManager om,
ExecutionContext executionContext, Throwable th) {
if (builder.isLog.get()) {
builder.getAuditLogger().logWrite(builder.getMessageBuilder().build());
return;
}

OMAction action = getAction(request.getOmRequest());
if (null == action) {
// no audit log defined
return;
}
if (builder.getAuditMap() == null) {
builder.setAuditMap(new HashMap<>());
}
try {
builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name());
builder.getAuditMap().put("Transaction", "" + executionContext.getIndex());
request.buildAuditMessage(action, builder.getAuditMap(),
th, request.getOmRequest().getUserInfo());
builder.setLog(true);
builder.setAuditLogger(om.getAuditLogger());
log(builder);
} catch (Exception ex) {
LOG.error("Exception occurred while write audit log, ", ex);
}
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.hadoop.ozone.om.lock;

import com.google.common.util.concurrent.Striped;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* key locking.
*/
public class KeyLock {
private static final Logger LOG = LoggerFactory.getLogger(KeyLock.class);
private static final long LOCK_TIMEOUT = 10 * 60 * 1000;
private final Striped<ReadWriteLock> fileStripedLock;

public KeyLock(int stripLockSize) {
fileStripedLock = Striped.readWriteLock(stripLockSize);
}

public List<Lock> lock(List<String> keyList) throws IOException {
List<Lock> locks = new ArrayList<>();
boolean isSuccess = false;
try {
Iterable<ReadWriteLock> readWriteLocks = fileStripedLock.bulkGet(keyList);
for (ReadWriteLock rwLock : readWriteLocks) {
Lock lockObj = rwLock.writeLock();
boolean b = lockObj.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
if (!b) {
LOG.error("Key write lock is failed for {} after wait of {}ms", this, LOCK_TIMEOUT);
throw new OMException("Unable to get write lock after " + LOCK_TIMEOUT + "ms"
+ ", read lock info: " + rwLock.readLock(),
OMException.ResultCodes.TIMEOUT);
}
locks.add(lockObj);
}
isSuccess = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OMException("Unable to get write lock as interrupted", OMException.ResultCodes.INTERNAL_ERROR);
} finally {
if (!isSuccess) {
Collections.reverse(locks);
locks.forEach(Lock::unlock);
locks.clear();
}
}
return locks;
}

public Lock lock(String key) throws IOException {
LOG.debug("Key {} is locked for instance {} {}", key, this, fileStripedLock.get(key));
try {
Lock lockObj = fileStripedLock.get(key).writeLock();
boolean b = lockObj.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
if (!b) {
LOG.error("Key {} lock is failed for {} after wait of {}ms", key, this, LOCK_TIMEOUT);
throw new OMException("Unable to get write lock for " + key + " after " + LOCK_TIMEOUT + "ms"
+ ", read lock info: " + fileStripedLock.get(key).readLock(),
OMException.ResultCodes.TIMEOUT);
}
return lockObj;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OMException("Unable to get read lock for " + key + " is interrupted",
OMException.ResultCodes.INTERNAL_ERROR);
}
}

public List<Lock> readLock(List<String> keyList) throws OMException {
List<Lock> locks = new ArrayList<>();
boolean isSuccess = false;
try {
Iterable<ReadWriteLock> readWriteLocks = fileStripedLock.bulkGet(keyList);
for (ReadWriteLock rwLock : readWriteLocks) {
Lock lockObj = rwLock.readLock();
boolean b = lockObj.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
if (!b) {
LOG.error("Key read lock is failed for {} after wait of {}ms", this, LOCK_TIMEOUT);
throw new OMException("Unable to get read lock after " + LOCK_TIMEOUT + "ms"
+ ", write lock info: " + rwLock.writeLock(),
OMException.ResultCodes.TIMEOUT);
}
locks.add(lockObj);
}
isSuccess = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OMException("Unable to get read lock as interrupted", OMException.ResultCodes.INTERNAL_ERROR);
} finally {
if (!isSuccess) {
Collections.reverse(locks);
locks.forEach(Lock::unlock);
locks.clear();
}
}
return locks;
}

public Lock readLock(String key) throws OMException {
try {
LOG.debug("Key {} is read locked for instance {} {}", key, this, fileStripedLock.get(key));
Lock lockObj = fileStripedLock.get(key).readLock();
boolean b = lockObj.tryLock(LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
if (!b) {
throw new OMException("Unable to get read lock for " + key + " after " + LOCK_TIMEOUT + "ms",
OMException.ResultCodes.TIMEOUT);
}
return lockObj;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new OMException("Unable to get read lock for " + key + " is interrupted",
OMException.ResultCodes.INTERNAL_ERROR);
}
}
}
Loading