Skip to content

HDDS-12554. Support callback on completed reconfiguration #8391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,11 +43,12 @@ public abstract class BackgroundService {

// Executor to launch child tasks
private final ScheduledThreadPoolExecutor exec;
private volatile ScheduledFuture<?> scheduledHandle;
private final ThreadGroup threadGroup;
private final String serviceName;
private final long interval;
private long interval;
private final long serviceTimeoutInNanos;
private final TimeUnit unit;
private TimeUnit unit;
private final PeriodicalTask service;

public BackgroundService(String serviceName, long interval,
Expand Down Expand Up @@ -103,8 +105,25 @@ public void runPeriodicalTaskNow() throws Exception {
}

// start service
public void start() {
exec.scheduleWithFixedDelay(service, 0, interval, unit);
public synchronized void start() {
if (scheduledHandle != null && !scheduledHandle.isCancelled()) {
LOG.warn("Background service {} is already running", serviceName);
return;
}
scheduledHandle = exec.scheduleWithFixedDelay(service, 0, interval, unit);
}

protected synchronized void setInterval(long newInterval, TimeUnit newUnit) {
this.interval = newInterval;
this.unit = newUnit;
}

public synchronized void stop() {
LOG.info("Stopping {}", serviceName);
if (scheduledHandle != null) {
scheduledHandle.cancel(false); // don't interrupt running tasks
scheduledHandle = null;
}
}

public abstract BackgroundTaskQueue getTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ public String getNamespace() {
.register(REPLICATION_STREAMS_LIMIT_KEY,
this::reconfigReplicationStreamsLimit);

reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size());
} else {
LOG.info("Reconfiguration complete. No properties were changed.");
}
});

datanodeStateMachine = new DatanodeStateMachine(this, datanodeDetails, conf,
dnCertClient, secretKeyClient, this::terminateDatanode,
reconfigurationHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.conf;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.ConfigRedactor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Reconfigurable;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class to support dynamic reconfiguration of configuration properties at runtime.
*/
public abstract class ReconfigurableBase extends Configured implements Reconfigurable {
private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class);
private final ReconfigurationUtil reconfigurationUtil = new ReconfigurationUtil();
private Thread reconfigThread = null;
private volatile boolean shouldRun = true;
private final Object reconfigLock = new Object();
private long startTime = 0L;
private long endTime = 0L;
private Map<ReconfigurationUtil.PropertyChange, Optional<String>> status = null;
private final Collection<Consumer<ReconfigurationTaskStatus>> reconfigurationCompleteCallbacks = new ArrayList<>();

public ReconfigurableBase(Configuration conf) {
super(conf == null ? new Configuration() : conf);
}

protected abstract Configuration getNewConf();

@VisibleForTesting
public Collection<ReconfigurationUtil.PropertyChange> getChangedProperties(Configuration newConf,
Configuration oldConf) {
return this.reconfigurationUtil.parseChangedProperties(newConf, oldConf);
}

public void startReconfigurationTask() throws IOException {
synchronized (this.reconfigLock) {
String errorMessage;
if (!this.shouldRun) {
errorMessage = "The server is stopped.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
} else if (this.reconfigThread != null) {
errorMessage = "Another reconfiguration task is running.";
LOG.warn(errorMessage);
throw new IOException(errorMessage);
} else {
this.reconfigThread = new ReconfigurationThread(this);
this.reconfigThread.setDaemon(true);
this.reconfigThread.setName("Reconfiguration Task");
this.reconfigThread.start();
this.startTime = Time.now();
}
}
}

public ReconfigurationTaskStatus getReconfigurationTaskStatus() {
synchronized (this.reconfigLock) {
return this.reconfigThread != null ? new ReconfigurationTaskStatus(this.startTime, 0L, null) :
new ReconfigurationTaskStatus(this.startTime, this.endTime, this.status);
}
}

public void shutdownReconfigurationTask() {
Thread tempThread;
synchronized (this.reconfigLock) {
this.shouldRun = false;
if (this.reconfigThread == null) {
return;
}

tempThread = this.reconfigThread;
this.reconfigThread = null;
}

try {
tempThread.join();
} catch (InterruptedException ignored) {
}

}

@Override
public final void reconfigureProperty(String property, String newVal) throws ReconfigurationException {
if (this.isPropertyReconfigurable(property)) {
LOG.info("changing property " + property + " to " + newVal);
synchronized (this.getConf()) {
this.getConf().get(property);
String effectiveValue = this.reconfigurePropertyImpl(property, newVal);
if (newVal != null) {
this.getConf().set(property, effectiveValue);
} else {
this.getConf().unset(property);
}

}
} else {
throw new ReconfigurationException(property, newVal, this.getConf().get(property));
}
}

@Override
public abstract Collection<String> getReconfigurableProperties();

@Override
public boolean isPropertyReconfigurable(String property) {
return this.getReconfigurableProperties().contains(property);
}

protected abstract String reconfigurePropertyImpl(String var1, String var2) throws ReconfigurationException;

private static class ReconfigurationThread extends Thread {
private final ReconfigurableBase parent;

ReconfigurationThread(ReconfigurableBase base) {
this.parent = base;
}

@Override
public void run() {
LOG.info("Starting reconfiguration task.");
Configuration oldConf = this.parent.getConf();
Configuration newConf = this.parent.getNewConf();
Collection<ReconfigurationUtil.PropertyChange> changes = this.parent.getChangedProperties(newConf, oldConf);
Map<ReconfigurationUtil.PropertyChange, Optional<String>> results = Maps.newHashMap();
ConfigRedactor oldRedactor = new ConfigRedactor(oldConf);
ConfigRedactor newRedactor = new ConfigRedactor(newConf);

for (ReconfigurationUtil.PropertyChange change : changes) {
String errorMessage = null;
String oldValRedacted = oldRedactor.redact(change.prop, change.oldVal);
String newValRedacted = newRedactor.redact(change.prop, change.newVal);
if (!this.parent.isPropertyReconfigurable(change.prop)) {
LOG.info(String.format("Property %s is not configurable: old value: %s, new value: %s",
change.prop, oldValRedacted, newValRedacted));
} else {
LOG.info("Change property: " + change.prop + " from \"" +
(change.oldVal == null ? "<default>" : oldValRedacted) + "\" to \"" +
(change.newVal == null ? "<default>" : newValRedacted) + "\".");

try {
String effectiveValue = this.parent.reconfigurePropertyImpl(change.prop, change.newVal);
if (change.newVal != null) {
oldConf.set(change.prop, effectiveValue);
} else {
oldConf.unset(change.prop);
}
} catch (ReconfigurationException reconfException) {
Throwable cause = reconfException.getCause();
errorMessage = cause == null ? reconfException.getMessage() : cause.getMessage();
LOG.error("Failed to reconfigure property {}: {}", change.prop, errorMessage, reconfException);
}

results.put(change, Optional.ofNullable(errorMessage));
}
}

synchronized (this.parent.reconfigLock) {
this.parent.endTime = Time.now();
this.parent.status = Collections.unmodifiableMap(results);
this.parent.reconfigThread = null;

LOG.info("Reconfiguration completed. {} properties were updated.", results.size());

for (Consumer<ReconfigurationTaskStatus> callback : parent.reconfigurationCompleteCallbacks) {
try {
callback.accept(parent.getReconfigurationTaskStatus());
} catch (Exception e) {
LOG.warn("Reconfiguration complete callback threw exception", e);
}
}
}
}
}

public void addReconfigurationCompleteCallback(Consumer<ReconfigurationTaskStatus> callback) {
synchronized (reconfigLock) {
this.reconfigurationCompleteCallbacks.add(callback);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.conf;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;

/**
* Callback interface to handle configuration changes after a reconfiguration task completes.
*/
@FunctionalInterface
public interface ReconfigurationChangeCallback {
void onPropertiesChanged(Map<String, Boolean> changedKeys, Configuration newConf);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.UnaryOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurableBase;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.conf.ReconfigurationUtil;
import org.apache.hadoop.hdds.protocol.ReconfigureProtocol;
import org.apache.ratis.util.function.CheckedConsumer;

Expand All @@ -47,11 +49,47 @@ public class ReconfigurationHandler extends ReconfigurableBase
private final Map<String, UnaryOperator<String>> properties =
new ConcurrentHashMap<>();

private final List<ReconfigurationChangeCallback> completeCallbacks = new ArrayList<>();
private BiConsumer<ReconfigurationTaskStatus, Configuration> reconfigurationStatusListener;

public void registerCompleteCallback(ReconfigurationChangeCallback callback) {
completeCallbacks.add(callback);
}

public void setReconfigurationCompleteCallback(BiConsumer<ReconfigurationTaskStatus, Configuration>
statusListener) {
this.reconfigurationStatusListener = statusListener;
}

private void triggerCompleteCallbacks(ReconfigurationTaskStatus status, Configuration newConf) {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
Map<String, Boolean> changedKeys = new HashMap<>();
for (ReconfigurationUtil.PropertyChange change : status.getStatus().keySet()) {
boolean deleted = change.newVal == null;
changedKeys.put(change.prop, !deleted);
}
for (ReconfigurationChangeCallback callback : completeCallbacks) {
callback.onPropertiesChanged(changedKeys, newConf);
}
}

if (reconfigurationStatusListener != null) {
reconfigurationStatusListener.accept(status, newConf);
}
}

public ReconfigurationHandler(String name, OzoneConfiguration config,
CheckedConsumer<String, IOException> requireAdminPrivilege) {
super(config);
this.name = name;
this.requireAdminPrivilege = requireAdminPrivilege;

// Register callback on reconfiguration complete
addReconfigurationCompleteCallback(status -> {
Configuration newConf = getNewConf();
triggerCompleteCallbacks(status, newConf);
});

}

public ReconfigurationHandler register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,14 @@ private StorageContainerManager(OzoneConfiguration conf,
.register(OZONE_READONLY_ADMINISTRATORS,
this::reconfOzoneReadOnlyAdmins);

reconfigurationHandler.setReconfigurationCompleteCallback((status, newConf) -> {
if (status.getStatus() != null && !status.getStatus().isEmpty()) {
LOG.info("Reconfiguration completed with {} updated properties.", status.getStatus().size());
} else {
LOG.info("Reconfiguration complete. No properties were changed.");
}
});

initializeSystemManagers(conf, configurator);

if (isSecretKeyEnable(securityConfig)) {
Expand Down
Loading