/* * Copyright (C) 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.lang.invoke.VarHandle; import java.lang.invoke.MethodHandles; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** * Runs tests to validate the concurrency guarantees of VarHandle. * * The tests involve having a lot of tasks and significantly fewer threads. The tasks are stored on * a queue and each thread tries to grab a task from the queue using operations like * VarHandle.compareAndSet(). If the operation works as specified, then each task would only be * handled in a single thread, exactly once. * * The tasks just add atomically a specified integer to a total. If the total is different from the * expected one, then either some tasks were run multiple times (on multiple threads), or some task * were not run at all (skipped by all threads). */ public class Main { private static final VarHandle QA; static { QA = MethodHandles.arrayElementVarHandle(TestTask[].class); } private static final int TASK_COUNT = 10000; private static final int THREAD_COUNT = 20; /* Each test may need several retries before a concurrent failure is seen. In the past, for a * known bug, between 5 and 10 retries were sufficient. Use RETRIES to configure how many * iterations to retry for each test scenario. However, to avoid the test running for too long, * for example with gcstress, set a cap duration in MAX_RETRIES_DURATION. With this at least one * iteration would run, but there could be fewer retries if each of them takes too long. */ private static final int RETRIES = 50; // b/235431387: timeout reduced from 1 minute private static final Duration MAX_RETRIES_DURATION = Duration.ofSeconds(15); public static void main(String[] args) throws Throwable { testConcurrentProcessing(new CompareAndExchangeRunnerFactory(), "compareAndExchange"); testConcurrentProcessing(new CompareAndSetRunnerFactory(), "compareAndSet"); testConcurrentProcessing(new WeakCompareAndSetRunnerFactory(), "weakCompareAndSet"); } private static void testConcurrentProcessing(RunnerFactory factory, String testName) throws Throwable { final Duration startTs = Duration.ofNanos(System.nanoTime()); final Duration endTs = startTs.plus(MAX_RETRIES_DURATION); for (int i = 0; i < RETRIES; ++i) { concurrentProcessingTestIteration(factory, i, testName); Duration now = Duration.ofNanos(System.nanoTime()); if (0 < now.compareTo(endTs)) { break; } } } private static void concurrentProcessingTestIteration( RunnerFactory factory, int iteration, String testName) throws Throwable { final TestTask[] tasks = new TestTask[TASK_COUNT]; final AtomicInteger result = new AtomicInteger(); for (int i = 0; i < TASK_COUNT; ++i) { tasks[i] = new TestTask(Integer.valueOf(i + 1), result::addAndGet); } Thread[] threads = new Thread[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; ++i) { threads[i] = factory.createRunner(tasks); } for (int i = 0; i < THREAD_COUNT; ++i) { threads[i].start(); } for (int i = 0; i < THREAD_COUNT; ++i) { threads[i].join(); } check(result.get(), TASK_COUNT * (TASK_COUNT + 1) / 2, testName + " test result not as expected", iteration); } /** * Processes the task queue until there are no tasks left. * * The actual task-grabbing mechanism is implemented in subclasses through grabTask(). * This allows testing various mechanisms, like compareAndSet() and compareAndExchange(). */ private static abstract class TaskRunner extends Thread { protected final TestTask[] tasks; TaskRunner(TestTask[] tasks) { this.tasks = tasks; } @Override public void run() { int i = 0; while (i < TASK_COUNT) { TestTask t = (TestTask) QA.get(tasks, i); if (t == null) { ++i; continue; } if (!grabTask(t, i)) { continue; } ++i; VarHandle.releaseFence(); t.exec(); } } /** * Grabs the next task from the queue in an atomic way. * * Once a task is retrieved successfully, the queue should no longer hold a reference to it. * This would be done, for example, by swapping the task with a null value. * * @param t The task to get from the queue * @param i The index where the task is found * * @return {@code true} if the task has been retrieved and is not available to any other * threads. Otherwise {@code false}. If {@code false} is returned, then either the task was * no longer present on the queue due to another thread grabbing it, or, in case of spurious * failure, the task is still available and no other thread managed to grab it. */ protected abstract boolean grabTask(TestTask t, int i); } private static class TaskRunnerWithCompareAndExchange extends TaskRunner { TaskRunnerWithCompareAndExchange(TestTask[] tasks) { super(tasks); } @Override protected boolean grabTask(TestTask t, int i) { return (t == QA.compareAndExchange(tasks, i, t, null)); } } private static class TaskRunnerWithCompareAndSet extends TaskRunner { TaskRunnerWithCompareAndSet(TestTask[] tasks) { super(tasks); } @Override protected boolean grabTask(TestTask t, int i) { return QA.compareAndSet(tasks, i, t, null); } } private static class TaskRunnerWithWeakCompareAndSet extends TaskRunner { TaskRunnerWithWeakCompareAndSet(TestTask[] tasks) { super(tasks); } @Override protected boolean grabTask(TestTask t, int i) { return QA.weakCompareAndSet(tasks, i, t, null); } } private interface RunnerFactory { Thread createRunner(TestTask[] tasks); } private static class CompareAndExchangeRunnerFactory implements RunnerFactory { @Override public Thread createRunner(TestTask[] tasks) { return new TaskRunnerWithCompareAndExchange(tasks); } } private static class CompareAndSetRunnerFactory implements RunnerFactory { @Override public Thread createRunner(TestTask[] tasks) { return new TaskRunnerWithCompareAndSet(tasks); } } private static class WeakCompareAndSetRunnerFactory implements RunnerFactory { @Override public Thread createRunner(TestTask[] tasks) { return new TaskRunnerWithWeakCompareAndSet(tasks); } } private static class TestTask { private final Integer ord; private final Consumer action; TestTask(Integer ord, Consumer action) { this.ord = ord; this.action = action; } public void exec() { action.accept(ord); } } private static void check(int actual, int expected, String msg, int iteration) { if (actual != expected) { System.err.println(String.format( "[iteration %d] %s : %d != %d", iteration, msg, actual, expected)); System.exit(1); } } }