flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [26/63] [abbrv] [FLINK-1094] Reworked, improved, and testes split assigners
Date Sun, 21 Sep 2014 02:12:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
new file mode 100644
index 0000000..5b76d53
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
@@ -0,0 +1,121 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.splitassigner;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplit;
+import org.junit.Test;
+
+
+public class DefaultSplitAssignerTest {
+
+	@Test
+	public void testSerialSplitAssignment() {
+		try {
+			final int NUM_SPLITS = 50;
+			
+			Set<InputSplit> splits = new HashSet<InputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new GenericInputSplit(i, NUM_SPLITS));
+			}
+			
+			DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignment() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			Set<InputSplit> splits = new HashSet<InputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new GenericInputSplit(i, NUM_SPLITS));
+			}
+			
+			final DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					String host = "";
+					GenericInputSplit split;
+					while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit(""));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c32569ae/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
new file mode 100644
index 0000000..ddad0d3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
@@ -0,0 +1,379 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.runtime.jobmanager.splitassigner;
+
+import static org.junit.Assert.*;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
+
+import org.junit.Test;
+
+
+public class LocatableSplitAssignerTest {
+	
+	@Test
+	public void testSerialSplitAssignmentWithNullHost() {
+		try {
+			final int NUM_SPLITS = 50;
+			final String[][] hosts = new String[][] {
+					new String[] { "localhost" },
+					new String[0],
+					null
+			};
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%3]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit(null)) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentAllForSameHost() {
+		try {
+			final int NUM_SPLITS = 50;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, "testhost"));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("testhost")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit(""));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentAllForRemoteHost() {
+		try {
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			final int NUM_SPLITS = 10 * hosts.length;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			while ((is = ia.getNextInputSplit("testhost")) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+			
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerialSplitAssignmentMixedLocalHost() {
+		try {
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			final int NUM_SPLITS = 10 * hosts.length;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+			}
+			
+			// get all available splits
+			LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			InputSplit is = null;
+			int i = 0;
+			while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) {
+				assertTrue(splits.remove(is));
+			}
+			
+			// check we had all
+			assertTrue(splits.isEmpty());
+			assertNull(ia.getNextInputSplit("anotherHost"));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentNullHost() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			final String[][] hosts = new String[][] {
+					new String[] { "localhost" },
+					new String[0],
+					null
+			};
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%3]));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit(null)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit(""));
+			
+			assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
+			assertEquals(0, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentForSingleHost() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, "testhost"));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit("testhost")) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit("testhost"));
+			
+			assertEquals(0, ia.getNumberOfRemoteAssignments());
+			assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConcurrentSplitAssignmentForMultipleHosts() {
+		try {
+			final int NUM_THREADS = 10;
+			final int NUM_SPLITS = 500;
+			final int SUM_OF_IDS = (NUM_SPLITS-1) * (NUM_SPLITS) / 2;
+			
+			final String[] hosts = { "host1", "host1", "host1", "host2", "host2", "host3" };
+			
+			// load some splits
+			Set<LocatableInputSplit> splits = new HashSet<LocatableInputSplit>();
+			for (int i = 0; i < NUM_SPLITS; i++) {
+				splits.add(new LocatableInputSplit(i, hosts[i%hosts.length]));
+			}
+			
+			final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
+			
+			final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+			final AtomicInteger sumOfIds = new AtomicInteger(0);
+			
+			Runnable retriever = new Runnable() {
+				
+				@Override
+				public void run() {
+					final String threadHost = hosts[(int) (Math.random() * hosts.length)];
+					
+					LocatableInputSplit split;
+					while ((split = ia.getNextInputSplit(threadHost)) != null) {
+						splitsRetrieved.incrementAndGet();
+						sumOfIds.addAndGet(split.getSplitNumber());
+					}
+				}
+			};
+			
+			// create the threads
+			Thread[] threads = new Thread[NUM_THREADS];
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i] = new Thread(retriever);
+				threads[i].setDaemon(true);
+			}
+			
+			// launch concurrently
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].start();
+			}
+			
+			// sync
+			for (int i = 0; i < NUM_THREADS; i++) {
+				threads[i].join(5000);
+			}
+			
+			// verify
+			for (int i = 0; i < NUM_THREADS; i++) {
+				if (threads[i].isAlive()) {
+					fail("The concurrency test case is erroneous, the thread did not respond in time.");
+				}
+			}
+			
+			assertEquals(NUM_SPLITS, splitsRetrieved.get());
+			assertEquals(SUM_OF_IDS, sumOfIds.get());
+			
+			// nothing left
+			assertNull(ia.getNextInputSplit("testhost"));
+			
+			// at least one fraction of hosts needs be local, no matter how bad the thread races
+			assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}


Mime
View raw message