tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [1/3] tez git commit: TEZ-3770. DAG-aware YARN task scheduler (jlowe)
Date Thu, 25 Jan 2018 15:44:08 GMT
Repository: tez
Updated Branches:
  refs/heads/master 3c7640d71 -> a9b8bb5a6


http://git-wip-us.apache.org/repos/asf/tez/blob/a9b8bb5a/tez-dag/src/test/java/org/apache/tez/test/ControlledScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/test/ControlledScheduledExecutorService.java
b/tez-dag/src/test/java/org/apache/tez/test/ControlledScheduledExecutorService.java
new file mode 100644
index 0000000..f6da15b
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/test/ControlledScheduledExecutorService.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.tez.dag.app.MockClock;
+import org.apache.tez.dag.app.MockClock.MockClockListener;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A scheduled executor service with timing that can be controlled for unit tests. */
+public class ControlledScheduledExecutorService implements ScheduledExecutorService, MockClockListener
{
+  private final MockClock clock;
+  private final PriorityQueue<ScheduledFutureTask<?>> queue = new PriorityQueue<>();
+  private final AtomicLong nextSequenceNum = new AtomicLong(0);
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+  public ControlledScheduledExecutorService(MockClock clock) {
+    this.clock = clock;
+    clock.register(this);
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+    ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null,
toTimestamp(delay, unit));
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+    ScheduledFutureTask<V> task = new ScheduledFutureTask<>(callable, toTimestamp(delay,
unit));
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
+    ScheduledFutureTask<Void> task = new ScheduledFutureTask<>(command, null,
+        toTimestamp(initialDelay, unit), unit.toMillis(delay));
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
+    return scheduleWithFixedDelay(command, initialDelay, period, unit);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> callable) {
+    ScheduledFutureTask<T> task = new ScheduledFutureTask<>(callable, 0);
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable runnable, T result) {
+    ScheduledFutureTask<T> task = new ScheduledFutureTask<>(runnable, result,
0);
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public Future<?> submit(Runnable runnable) {
+    ScheduledFutureTask<?> task = new ScheduledFutureTask<>(runnable, null, 0);
+    schedule(task);
+    return task;
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks) {
+    throw new UnsupportedOperationException("invokeAll not yet implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit) throws InterruptedException {
+    throw new UnsupportedOperationException("invokeAll not yet implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws
InterruptedException, ExecutionException {
+    throw new UnsupportedOperationException("invokeAny not yet implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
+    throw new UnsupportedOperationException("invokeAny not yet implemented");
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    submit(command);
+  }
+
+  @Override
+  public void shutdown() {
+    stopped.set(true);
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    stopped.set(true);
+    return new ArrayList<Runnable>(queue);
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return stopped.get();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
+    return false;
+  }
+
+  @Override
+  public void onTimeUpdated(long newTime) {
+     ScheduledFutureTask<?> task = queue.peek();
+     while (task != null && task.timestamp <= newTime) {
+       task = queue.poll();
+       runTask(task);
+       task = queue.peek();
+     }
+  }
+
+  private long now() {
+    return clock.getTime();
+  }
+
+  private long toTimestamp(long delay, TimeUnit unit) {
+    return now() + unit.toMillis(delay);
+  }
+
+  private void schedule(ScheduledFutureTask<?> task) {
+    if (isShutdown()) {
+      throw new RejectedExecutionException("Executor has been shutdown");
+    }
+    if (now() - task.timestamp >= 0) {
+      runTask(task);
+    } else {
+      queue.add(task);
+    }
+  }
+
+  private void runTask(ScheduledFutureTask<?> task) {
+    task.run();
+    if (task.isPeriodic() && !isShutdown()) {
+      task.timestamp = toTimestamp(task.period, TimeUnit.MILLISECONDS);
+      queue.add(task);
+    }
+  }
+
+  private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>
{
+    private final long sequenceNum;
+    private final long period;
+    private long timestamp;
+
+    public ScheduledFutureTask(Callable<V> callable, long timestamp) {
+      super(callable);
+      this.sequenceNum = nextSequenceNum.getAndIncrement();
+      this.timestamp = timestamp;
+      this.period = 0;
+    }
+
+    public ScheduledFutureTask(Runnable runnable, V result, long timestamp) {
+      super(runnable, result);
+      this.sequenceNum = nextSequenceNum.getAndIncrement();
+      this.timestamp = timestamp;
+      this.period = 0;
+    }
+
+    public ScheduledFutureTask(Runnable runnable, V result, long timestamp, long period)
{
+      super(runnable, result);
+      this.sequenceNum = nextSequenceNum.getAndIncrement();
+      this.timestamp = timestamp;
+      this.period = period;
+    }
+
+    @Override
+    public boolean isPeriodic() {
+      return period != 0;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(timestamp - now(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      if (o == this) {
+        return 0;
+      }
+      int result = Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+      if (result == 0 && o instanceof ScheduledFutureTask) {
+        ScheduledFutureTask<?> otherTask = (ScheduledFutureTask<?>) o;
+        result = Long.compare(sequenceNum, otherTask.sequenceNum);
+      }
+      return result;
+    }
+  }
+}


Mime
View raw message