Skip to content

HDDS-11418. leader executor framework [prototype] #7406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
*/
void loadFromFile(File externalFile) throws IOException;

default Table<byte[], byte[]> getRawTable() {
throw new NotImplementedException("getRawTable is not implemented");
}

/**
* Class used to represent the key and value pair of a db entry.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,15 @@ private CodecBuffer encodeKeyCodecBuffer(KEY key) throws IOException {
return key == null ? null : keyCodec.toDirectCodecBuffer(key);
}

public CodecBuffer encodeValueCodecBuffer(VALUE value) throws IOException {
return value == null ? null : valueCodec.toDirectCodecBuffer(value);
}

private byte[] encodeKey(KEY key) throws IOException {
return key == null ? null : keyCodec.toPersistedFormat(key);
}

private byte[] encodeValue(VALUE value) throws IOException {
public byte[] encodeValue(VALUE value) throws IOException {
return value == null ? null : valueCodec.toPersistedFormat(value);
}

Expand Down Expand Up @@ -543,6 +547,11 @@ public void loadFromFile(File externalFile) throws IOException {
rawTable.loadFromFile(externalFile);
}

@Override
public Table<byte[], byte[]> getRawTable() {
return rawTable;
}

@Override
public void cleanupCache(List<Long> epochs) {
cache.cleanup(epochs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ public static boolean isReadOnly(
case AbortExpiredMultiPartUploads:
case SetSnapshotProperty:
case QuotaRepair:
case PersistDb:
case UnknownCommand:
return false;
case EchoRPC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
package org.apache.hadoop.ozone.om.helpers;


import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.utils.db.Codec;
Expand All @@ -39,8 +39,6 @@
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;

import com.google.common.base.Preconditions;

/**
* A class that encapsulates Bucket Info.
*/
Expand Down Expand Up @@ -255,10 +253,18 @@ public String getSourceBucket() {


public long getUsedBytes() {
QuotaResource quotaResource = QuotaResource.Factory.getQuotaResource(getObjectID());
if (null != quotaResource) {
return quotaResource.getUsedBytes() + usedBytes;
}
return usedBytes;
}

public long getUsedNamespace() {
QuotaResource quotaResource = QuotaResource.Factory.getQuotaResource(getObjectID());
if (null != quotaResource) {
return quotaResource.getUsedNamespace() + usedNamespace;
}
return usedNamespace;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Quota resource.
*/
public class QuotaResource {
private final AtomicLong incUsedBytes = new AtomicLong();
private final AtomicLong incUsedNamespace = new AtomicLong();
public QuotaResource(long usedBytes, long usedNamespace) {
incUsedBytes.set(usedBytes);
incUsedNamespace.set(usedNamespace);
}

public long addUsedBytes(long bytes) {
return incUsedBytes.addAndGet(bytes);
}

public long getUsedBytes() {
return incUsedBytes.get();
}

public long addUsedNamespace(long namespace) {
return incUsedNamespace.addAndGet(namespace);
}

public long getUsedNamespace() {
return incUsedNamespace.get();
}

/**
* factory class.
*/
public static class Factory {
private static final Map<Long, QuotaResource> QUOTA_RESOURCE_MAP = new ConcurrentHashMap<>();
private static final Map<Long, Map<Long, QuotaResource>> REQ_RESERVED = new ConcurrentHashMap<>();

public static QuotaResource getQuotaResource(long id) {
return QUOTA_RESOURCE_MAP.get(id);
}

public static void registerQuotaResource(long id) {
QUOTA_RESOURCE_MAP.putIfAbsent(id, new QuotaResource(0, 0));
}

public static void removeQuotaResource(long id) {
QUOTA_RESOURCE_MAP.remove(id);
}

public static void addReserveTracker(long trackId, long id, long bytes, long namespace) {
Map<Long, QuotaResource> quotaIdMap = REQ_RESERVED.computeIfAbsent(trackId, (k) -> new ConcurrentHashMap<>());
QuotaResource reserveQuota = quotaIdMap.get(id);
if (reserveQuota == null) {
quotaIdMap.put(id, new QuotaResource(bytes, namespace));
} else {
reserveQuota.addUsedBytes(bytes);
reserveQuota.addUsedNamespace(namespace);
}
}

public static void resetReservedSpace(long trackId) {
Map<Long, QuotaResource> remove = REQ_RESERVED.remove(trackId);
if (remove != null) {
for (Map.Entry<Long, QuotaResource> entry : remove.entrySet()) {
QuotaResource bucketQuota = getQuotaResource(entry.getKey());
if (null != bucketQuota) {
bucketQuota.addUsedBytes(-entry.getValue().getUsedBytes());
bucketQuota.addUsedNamespace(-entry.getValue().getUsedNamespace());
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ enum Type {
GetQuotaRepairStatus = 135;
StartQuotaRepair = 136;
SnapshotMoveTableKeys = 137;
PersistDb = 138;
}

enum SafeMode {
Expand Down Expand Up @@ -295,6 +296,8 @@ message OMRequest {
optional GetQuotaRepairStatusRequest GetQuotaRepairStatusRequest = 133;
optional StartQuotaRepairRequest StartQuotaRepairRequest = 134;
optional SnapshotMoveTableKeysRequest SnapshotMoveTableKeysRequest = 135;
optional PersistDbRequest PersistDbRequest = 136;
optional ExecutionControlRequest ExecutionControlRequest = 137;
}

message OMResponse {
Expand Down Expand Up @@ -424,6 +427,7 @@ message OMResponse {
optional QuotaRepairResponse QuotaRepairResponse = 134;
optional GetQuotaRepairStatusResponse GetQuotaRepairStatusResponse = 136;
optional StartQuotaRepairResponse StartQuotaRepairResponse = 137;
optional PersistDbResponse PersistDbResponse = 138;
}

enum Status {
Expand Down Expand Up @@ -2232,6 +2236,7 @@ message BucketQuotaCount {
required int64 diffUsedBytes = 3;
required int64 diffUsedNamespace = 4;
required bool supportOldQuota = 5 [default=false];
optional uint64 bucketObjectId = 6;
}

message QuotaRepairResponse {
Expand Down Expand Up @@ -2259,6 +2264,29 @@ message OMLockDetailsProto {
optional uint64 writeLockNanos = 4;
}

message PersistDbRequest {
repeated DBTableUpdate tableUpdates = 1;
repeated BucketQuotaCount bucketQuotaCount = 3;
}
message DBTableUpdate {
required string tableName = 1;
repeated DBTableRecord records = 2;
}
message DBTableRecord {
required bytes key = 1;
optional bytes value = 2;
}
message PersistDbResponse {
}
message ExecutionControlRequest {
repeated ClientRequestInfo requestInfo = 1;
}
message ClientRequestInfo {
optional string uuidClientId = 1;
optional uint64 callId = 2;
optional OMResponse response = 3;
optional int64 index = 4;
}
/**
The OM service that takes care of Ozone namespace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
* as present for request.
*/
public enum OMSystemAction implements AuditAction {
STARTUP;
STARTUP,
DBPERSIST;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.ratis.execution.OMGateway;
import org.apache.hadoop.ozone.om.ratis_snapshot.OmRatisSnapshotProvider;
import org.apache.hadoop.ozone.om.ha.OMHAMetrics;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
Expand Down Expand Up @@ -475,6 +476,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl

private boolean fsSnapshotEnabled;

private final OMGateway omGateway;

/**
* OM Startup mode.
*/
Expand Down Expand Up @@ -723,6 +726,10 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
if (isOmGrpcServerEnabled) {
omS3gGrpcServer = getOmS3gGrpcServer(configuration);
}

// init om gateway for request
omGateway = new OMGateway(this);

ShutdownHookManager.get().addShutdownHook(this::saveOmMetrics,
SHUTDOWN_HOOK_PRIORITY);

Expand Down Expand Up @@ -1749,6 +1756,8 @@ public void start() throws IOException {
bootstrap(omNodeDetails);
}

omGateway.start();

omState = State.RUNNING;
auditMap.put("NewOmState", omState.name());
SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap));
Expand Down Expand Up @@ -1826,6 +1835,7 @@ public void restart() throws IOException {
}
startJVMPauseMonitor();
setStartTime();
omGateway.start();
omState = State.RUNNING;
auditMap.put("NewOmState", omState.name());
SYSTEMAUDIT.logWriteSuccess(buildAuditMessageForSuccess(OMSystemAction.STARTUP, auditMap));
Expand Down Expand Up @@ -2266,6 +2276,7 @@ public boolean stop() {
}
try {
omState = State.STOPPED;
omGateway.stop();
// Cancel the metrics timer and set to null.
if (metricsTimer != null) {
metricsTimer.cancel();
Expand Down Expand Up @@ -5032,4 +5043,8 @@ public void checkFeatureEnabled(OzoneManagerVersion feature) throws OMException
throw new OMException("Feature disabled: " + feature, OMException.ResultCodes.NOT_SUPPORTED_OPERATION);
}
}

public OMGateway getOmGateway() {
return omGateway;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ratis.execution.request.ExecutionContext;
import org.apache.hadoop.ozone.om.ratis.execution.request.OMRequestExecutor;
import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -124,6 +126,17 @@ public static void log(OMAuditLogger.Builder builder, TermIndex termIndex) {
}
}

public static void log(OMAuditLogger.Builder builder, ExecutionContext executionContext) {
if (builder.isLog.get()) {
if (null == builder.getAuditMap()) {
builder.setAuditMap(new HashMap<>());
}
builder.getAuditMap().put("Transaction", "" + executionContext.getIndex());
builder.getMessageBuilder().withParams(builder.getAuditMap());
builder.getAuditLogger().logWrite(builder.getMessageBuilder().build());
}
}

public static void log(OMAuditLogger.Builder builder) {
if (builder.isLog.get()) {
builder.getMessageBuilder().withParams(builder.getAuditMap());
Expand Down Expand Up @@ -159,6 +172,34 @@ public static void log(OMAuditLogger.Builder builder, OMClientRequest request, O
}
}

public static void log(OMAuditLogger.Builder builder, OMRequestExecutor request, OzoneManager om,
ExecutionContext executionContext, Throwable th) {
if (builder.isLog.get()) {
builder.getAuditLogger().logWrite(builder.getMessageBuilder().build());
return;
}

OMAction action = getAction(request.getOmRequest());
if (null == action) {
// no audit log defined
return;
}
if (builder.getAuditMap() == null) {
builder.setAuditMap(new HashMap<>());
}
try {
builder.getAuditMap().put("Command", request.getOmRequest().getCmdType().name());
builder.getAuditMap().put("Transaction", "" + executionContext.getIndex());
request.buildAuditMessage(action, builder.getAuditMap(),
th, request.getOmRequest().getUserInfo());
builder.setLog(true);
builder.setAuditLogger(om.getAuditLogger());
log(builder);
} catch (Exception ex) {
LOG.error("Exception occurred while write audit log, ", ex);
}
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down
Loading