flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [22/22] git commit: Add some options for slot-based scheduling and changed default parallelism to one.
Date Sun, 22 Jun 2014 21:47:43 GMT
Add some options for slot-based scheduling and changed default parallelism to one.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b4b633ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b4b633ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b4b633ea

Branch: refs/heads/master
Commit: b4b633eab9a70e14d2e0dd5252f4b092a3689093
Parents: 8c1d82a
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Jun 22 18:19:57 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Jun 22 21:09:13 2014 +0200

----------------------------------------------------------------------
 .../configuration/ConfigConstants.java          |  5 --
 .../memory/OutputViewDataOutputWrapper.java     |  2 +-
 .../util/InstantiationUtilsTest.java            |  4 +-
 .../conf/stratosphere-conf.yaml                 |  2 +
 .../api/java/io/CollectionInputFormat.java      | 15 ++--
 .../api/java/io/CollectionInputFormatTest.java  | 79 ++++++++++++++++++++
 .../nephele/executiongraph/ExecutionGraph.java  |  4 +-
 .../nephele/jobmanager/JobManager.java          |  4 +-
 .../nephele/taskmanager/TaskManager.java        | 19 +++--
 .../executiongraph/ExecutionGraphTest.java      |  4 +-
 .../nephele/jobmanager/JobManagerITCase.java    |  2 +-
 11 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
index eff48cc..b4699b3 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java
@@ -29,11 +29,6 @@ public final class ConfigConstants {
 	 * The config parameter defining the default degree of parallelism for jobs.
 	 */
 	public static final String DEFAULT_PARALLELIZATION_DEGREE_KEY = "parallelization.degree.default";
-
-	/**
-	 * The config parameter defining the maximal intra-node parallelism for jobs.
-	 */
-	public static final String PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY = "parallelization.intra-node.default";
 	
 	// -------------------------------- Runtime -------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
index 7bb8f8c..cb636ce 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/core/memory/OutputViewDataOutputWrapper.java
@@ -110,7 +110,7 @@ public class OutputViewDataOutputWrapper implements DataOutputView {
 	@Override
 	public void write(DataInputView source, int numBytes) throws IOException {
 		for (int i = 0; i < numBytes; i++) {
-			this.delegate.writeByte(source.readByte());
+			this.delegate.writeByte(source.readUnsignedByte());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-core/src/test/java/eu/stratosphere/util/InstantiationUtilsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/util/InstantiationUtilsTest.java
b/stratosphere-core/src/test/java/eu/stratosphere/util/InstantiationUtilsTest.java
index 50c8703..8b55635 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/util/InstantiationUtilsTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/util/InstantiationUtilsTest.java
@@ -60,7 +60,5 @@ public class InstantiationUtilsTest {
 		InstantiationUtil.checkForInstantiation(TestClass.class);
 	}
 
-	private class TestClass {
-
-	}
+	private class TestClass {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-dist/src/main/stratosphere-bin/conf/stratosphere-conf.yaml
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/conf/stratosphere-conf.yaml b/stratosphere-dist/src/main/stratosphere-bin/conf/stratosphere-conf.yaml
index ccf1c21..ccc67c2 100644
--- a/stratosphere-dist/src/main/stratosphere-bin/conf/stratosphere-conf.yaml
+++ b/stratosphere-dist/src/main/stratosphere-bin/conf/stratosphere-conf.yaml
@@ -25,6 +25,8 @@ jobmanager.heap.mb: 256
 
 taskmanager.heap.mb: 512
 
+taskmanager.numberOfTaskSlots: -1
+
 parallelization.degree.default: 1
 
 #==============================================================================

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index fd5ae36..82f2755 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -37,10 +37,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 
 	private static final long serialVersionUID = 1L;
 
-	private Collection<T> dataSet; // input data as collection
-
 	private TypeSerializer<T> serializer;
 
+	private transient Collection<T> dataSet; // input data as collection. transient, because
it will be serialized in a custom way
+	
 	private transient Iterator<T> iterator;
 
 	
@@ -75,7 +75,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 	// --------------------------------------------------------------------------------------------
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeObject(serializer);
+		out.defaultWriteObject();
 		out.writeInt(dataSet.size());
 		
 		OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper();
@@ -86,13 +86,8 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 		}
 	}
 
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException {
-		try {
-			this.serializer = (TypeSerializer<T>) in.readObject();
-		} catch (ClassNotFoundException ex){
-			throw new IOException("Could not load the serializer class.", ex);
-		}
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+		in.defaultReadObject();
 
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
index 4388c9c..f734540 100644
--- a/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/io/CollectionInputFormatTest.java
@@ -17,6 +17,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
 import eu.stratosphere.api.java.typeutils.TypeExtractor;
 import eu.stratosphere.core.io.GenericInputSplit;
 import eu.stratosphere.types.TypeInformation;
@@ -29,7 +30,9 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 
 public class CollectionInputFormatTest {
 	public static class ElementType{
@@ -105,4 +108,80 @@ public class CollectionInputFormatTest {
 			fail(ex.toString());
 		}
 	}
+	
+	@Test
+	public void testSerializabilityStrings() {
+		
+		final String[] data = new String[] {
+				"To be, or not to be,--that is the question:--",
+				"Whether 'tis nobler in the mind to suffer",
+				"The slings and arrows of outrageous fortune",
+				"Or to take arms against a sea of troubles,",
+				"And by opposing end them?--To die,--to sleep,--",
+				"No more; and by a sleep to say we end",
+				"The heartache, and the thousand natural shocks",
+				"That flesh is heir to,--'tis a consummation",
+				"Devoutly to be wish'd. To die,--to sleep;--",
+				"To sleep! perchance to dream:--ay, there's the rub;",
+				"For in that sleep of death what dreams may come,",
+				"When we have shuffled off this mortal coil,",
+				"Must give us pause: there's the respect",
+				"That makes calamity of so long life;",
+				"For who would bear the whips and scorns of time,",
+				"The oppressor's wrong, the proud man's contumely,",
+				"The pangs of despis'd love, the law's delay,",
+				"The insolence of office, and the spurns",
+				"That patient merit of the unworthy takes,",
+				"When he himself might his quietus make",
+				"With a bare bodkin? who would these fardels bear,",
+				"To grunt and sweat under a weary life,",
+				"But that the dread of something after death,--",
+				"The undiscover'd country, from whose bourn",
+				"No traveller returns,--puzzles the will,",
+				"And makes us rather bear those ills we have",
+				"Than fly to others that we know not of?",
+				"Thus conscience does make cowards of us all;",
+				"And thus the native hue of resolution",
+				"Is sicklied o'er with the pale cast of thought;",
+				"And enterprises of great pith and moment,",
+				"With this regard, their currents turn awry,",
+				"And lose the name of action.--Soft you now!",
+				"The fair Ophelia!--Nymph, in thy orisons",
+				"Be all my sins remember'd."
+		};
+		
+		try {
+			
+			List<String> inputCollection = Arrays.asList(data);
+			CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer());
+			
+			// serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos);
+			oos.writeObject(inputFormat);
+			oos.close();
+			
+			// deserialize
+			ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+			ObjectInputStream ois = new ObjectInputStream(bais);
+			Object result = ois.readObject();
+			
+			assertTrue(result instanceof CollectionInputFormat);
+			
+			int i = 0;
+			@SuppressWarnings("unchecked")
+			CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
+			in.open(new GenericInputSplit());
+			
+			while (!in.reachedEnd()) {
+				assertEquals(data[i++], in.nextRecord(""));
+			}
+			
+			assertEquals(data.length, i);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
index 18395fb..5886650 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java
@@ -145,7 +145,6 @@ public class ExecutionGraph implements ExecutionListener {
 	 *        the configuration originally attached to the job graph
 	 */
 	private ExecutionGraph(final JobID jobID, final String jobName, final Configuration jobConfiguration)
{
-
 		if (jobID == null) {
 			throw new IllegalArgumentException("Argument jobID must not be null");
 		}
@@ -165,8 +164,7 @@ public class ExecutionGraph implements ExecutionListener {
 	 * @throws GraphConversionException
 	 *         thrown if the job graph is not valid and no execution graph can be constructed
from it
 	 */
-	public ExecutionGraph(final JobGraph job, final int defaultParallelism)
-																					throws GraphConversionException {
+	public ExecutionGraph(JobGraph job, int defaultParallelism) throws GraphConversionException
{
 		this(job.getJobID(), job.getName(), job.getJobConfiguration());
 
 		// Start constructing the new execution graph from given job graph

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index f3cf3a3..40e2a0b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -16,14 +16,12 @@ package eu.stratosphere.nephele.jobmanager;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -468,7 +466,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			ExecutionGraph eg;
 	
 			try {
-				eg = new ExecutionGraph(job, this.getAvailableSlots());
+				eg = new ExecutionGraph(job, 1);
 			} catch (GraphConversionException e) {
 				if (e.getCause() == null) {
 					return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 575454f..3225ab7 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -172,8 +172,12 @@ public class TaskManager implements TaskOperationProtocol {
 			throw new NullPointerException("Execution mode must not be null.");
 		}
 		
-		
-//		LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
+		try {
+			LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
+		} catch (Throwable t) {
+			LOG.error("Cannot determine user group information.", t);
+		}
+			
 		LOG.info("User system property: " + System.getProperty("user.name"));
 		LOG.info("Execution mode: " + executionMode);
 
@@ -344,9 +348,14 @@ public class TaskManager implements TaskOperationProtocol {
 		{
 			HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
 			
-			numberOfSlots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-					Hardware.getNumberCPUCores());
-
+			int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-1);
+			if (slots == -1) { 
+				slots = Hardware.getNumberCPUCores();
+			} else if (slots <= 0) {
+				throw new Exception("Illegal value for the number of task slots: " + slots);
+			}
+			this.numberOfSlots = slots;
+			
 			// Check whether the memory size has been explicitly configured. if so that overrides
the default mechanism
 			// of taking as much as is mentioned in the hardware description
 			long memorySize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
-1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
index 2e75305..9d8700d 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
@@ -104,7 +104,7 @@ public class ExecutionGraphTest {
 
 			LibraryCacheManager.register(jobID, new String[0]);
 
-			final ExecutionGraph eg = new ExecutionGraph(jg, -1);
+			final ExecutionGraph eg = new ExecutionGraph(jg, 1);
 
 			// test all methods of ExecutionGraph
 			final ExecutionStage executionStage = eg.getCurrentExecutionStage();
@@ -215,7 +215,7 @@ public class ExecutionGraphTest {
 			assertEquals(1, egv2.getNumberOfBackwardLinks());
 			assertEquals(1, egv2.getNumberOfForwardLinks());
 			assertEquals(0, egv2.getStageNumber());
-			assertEquals(-1, egv2.getUserDefinedNumberOfMembers());
+			assertEquals(1, egv2.getUserDefinedNumberOfMembers());
 			assertNull(egv2.getVertexToShareInstancesWith());
 
 			// test all methods of ExecutionVertex

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b4b633ea/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index 2549d4f..ffb958a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -66,7 +66,7 @@ public class JobManagerITCase {
 
 	static {
 		// no logging, because the tests create expected exception
-		LogUtils.initializeDefaultConsoleLogger(Level.INFO);
+		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
 	}
 	
 	/**


Mime
View raw message