Skip to content

Commit 5aa7383

Browse files
adoroszlaixiaoyuyao
authored andcommitted
HDDS-2653. Improve executor memory usage in new Freon tests (#284)
1 parent 7b71678 commit 5aa7383

File tree

1 file changed

+80
-36
lines changed

1 file changed

+80
-36
lines changed

hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/BaseFreonGenerator.java

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -101,71 +101,102 @@ public class BaseFreonGenerator {
101101
private MetricRegistry metrics = new MetricRegistry();
102102

103103
private AtomicLong successCounter;
104-
105104
private AtomicLong failureCounter;
105+
private AtomicLong attemptCounter;
106106

107107
private long startTime;
108108

109109
private PathSchema pathSchema;
110+
private String spanName;
111+
private ExecutorService executor;
112+
private ProgressBar progressBar;
110113

111114
/**
112115
* The main logic to execute a test generator.
113116
*
114117
* @param provider creates the new steps to execute.
115118
*/
116119
public void runTests(TaskProvider provider) {
120+
setup(provider);
121+
startTaskRunners(provider);
122+
waitForCompletion();
123+
shutdown();
124+
reportAnyFailure();
125+
}
117126

118-
ExecutorService executor = Executors.newFixedThreadPool(threadNo);
119-
120-
ProgressBar progressBar =
121-
new ProgressBar(System.out, testNo, successCounter::get);
122-
progressBar.start();
123-
124-
startTime = System.currentTimeMillis();
125-
//schedule the execution of all the tasks.
126-
127-
for (long i = 0; i < testNo; i++) {
128-
129-
final long counter = i;
127+
/**
128+
* Performs {@code provider}-specific initialization.
129+
*/
130+
private void setup(TaskProvider provider) {
131+
//provider is usually a lambda, print out only the owner class name:
132+
spanName = provider.getClass().getSimpleName().split("\\$")[0];
133+
}
130134

131-
//provider is usually a lambda, print out only the owner class name:
132-
String spanName = provider.getClass().getSimpleName().split("\\$")[0];
135+
/**
136+
* Launches {@code threadNo} task runners in executor. Each one executes test
137+
* tasks in a loop until completion or failure.
138+
*/
139+
private void startTaskRunners(TaskProvider provider) {
140+
for (int i = 0; i < threadNo; i++) {
141+
executor.execute(() -> taskLoop(provider));
142+
}
143+
}
133144

134-
executor.execute(() -> {
135-
Scope scope =
136-
GlobalTracer.get().buildSpan(spanName)
137-
.startActive(true);
138-
try {
145+
/**
146+
* Runs test tasks in a loop until completion or failure. This is executed
147+
* concurrently in {@code executor}.
148+
*/
149+
private void taskLoop(TaskProvider provider) {
150+
while (true) {
151+
long counter = attemptCounter.getAndIncrement();
139152

140-
//in case of an other failed test, we shouldn't execute more tasks.
141-
if (!failAtEnd && failureCounter.get() > 0) {
142-
return;
143-
}
153+
//in case of an other failed test, we shouldn't execute more tasks.
154+
if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) {
155+
return;
156+
}
144157

145-
provider.executeNextTask(counter);
146-
successCounter.incrementAndGet();
147-
} catch (Exception e) {
148-
scope.span().setTag("failure", true);
149-
failureCounter.incrementAndGet();
150-
LOG.error("Error on executing task", e);
151-
} finally {
152-
scope.close();
153-
}
154-
});
158+
tryNextTask(provider, counter);
155159
}
160+
}
156161

157-
// wait until all tasks are executed
162+
/**
163+
* Runs a single test task (eg. key creation).
164+
* @param taskId unique ID of the task
165+
*/
166+
private void tryNextTask(TaskProvider provider, long taskId) {
167+
Scope scope =
168+
GlobalTracer.get().buildSpan(spanName)
169+
.startActive(true);
170+
try {
171+
provider.executeNextTask(taskId);
172+
successCounter.incrementAndGet();
173+
} catch (Exception e) {
174+
scope.span().setTag("failure", true);
175+
failureCounter.incrementAndGet();
176+
LOG.error("Error on executing task {}", taskId, e);
177+
} finally {
178+
scope.close();
179+
}
180+
}
158181

182+
/**
183+
* Waits until the requested number of tests are executed, or until any
184+
* failure in early failure mode (the default). This is run in the main
185+
* thread.
186+
*/
187+
private void waitForCompletion() {
159188
while (successCounter.get() + failureCounter.get() < testNo && (
160189
failureCounter.get() == 0 || failAtEnd)) {
161190
try {
162191
Thread.sleep(CHECK_INTERVAL_MILLIS);
163192
} catch (InterruptedException e) {
193+
Thread.currentThread().interrupt();
164194
throw new RuntimeException(e);
165195
}
166196
}
197+
}
167198

168-
//shutdown everything
199+
private void shutdown() {
169200
if (failureCounter.get() > 0 && !failAtEnd) {
170201
progressBar.terminate();
171202
} else {
@@ -177,7 +208,12 @@ public void runTests(TaskProvider provider) {
177208
} catch (Exception ex) {
178209
ex.printStackTrace();
179210
}
211+
}
180212

213+
/**
214+
* @throws RuntimeException if any tests failed
215+
*/
216+
private void reportAnyFailure() {
181217
if (failureCounter.get() > 0) {
182218
throw new RuntimeException("One ore more freon test is failed.");
183219
}
@@ -192,6 +228,7 @@ public void init() {
192228

193229
successCounter = new AtomicLong(0);
194230
failureCounter = new AtomicLong(0);
231+
attemptCounter = new AtomicLong(0);
195232

196233
if (prefix.length() == 0) {
197234
prefix = RandomStringUtils.randomAlphanumeric(10);
@@ -212,6 +249,13 @@ public void init() {
212249
}
213250
printReport();
214251
}));
252+
253+
executor = Executors.newFixedThreadPool(threadNo);
254+
255+
progressBar = new ProgressBar(System.out, testNo, successCounter::get);
256+
progressBar.start();
257+
258+
startTime = System.currentTimeMillis();
215259
}
216260

217261
/**

0 commit comments

Comments
 (0)