flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [4/9] flink git commit: [FLINK-1679] use a consistent name for parallelism
Date Mon, 23 Mar 2015 08:09:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
index 7f468fa..87d7779 100644
--- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaAggregateOperator.java
@@ -178,8 +178,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 
 			// set input
 			po.setInput(input);
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			// set parallelism
+			po.setParallelism(this.getParallelism());
 
 			return po;
 		}
@@ -195,8 +195,8 @@ public class ScalaAggregateOperator<IN> extends SingleInputOperator<IN, IN, Scal
 
 			// set input
 			po.setInput(input);
-			// set dop
-			po.setDegreeOfParallelism(this.getParallelism());
+			// set parallelism
+			po.setParallelism(this.getParallelism());
 
 			SingleInputSemanticProperties props = new SingleInputSemanticProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 00761ec..1291181 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -72,7 +72,7 @@ import scala.reflect.ClassTag
  * are named similarly. All functions are available in package
  * `org.apache.flink.api.common.functions`.
  *
- * The elements are partitioned depending on the degree of parallelism of the
+ * The elements are partitioned depending on the parallelism of the
  * [[ExecutionEnvironment]] or of one specific DataSet.
  *
  * Most of the operations have an implicit [[TypeInformation]] parameter. This is supplied by
@@ -149,13 +149,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Sets the degree of parallelism of this operation. This must be greater than 1.
+   * Sets the parallelism of this operation. This must be greater than 1.
    */
-  def setParallelism(dop: Int) = {
+  def setParallelism(parallelism: Int) = {
     javaSet match {
-      case ds: DataSource[_] => ds.setParallelism(dop)
-      case op: Operator[_, _] => op.setParallelism(dop)
-      case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(dop)
+      case ds: DataSource[_] => ds.setParallelism(parallelism)
+      case op: Operator[_, _] => op.setParallelism(parallelism)
+      case di: DeltaIterationResultSet[_, _] => di.getIterationHead.parallelism(parallelism)
       case _ =>
         throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
           "parallelism.")
@@ -164,7 +164,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Returns the degree of parallelism of this operation.
+   * Returns the parallelism of this operation.
    */
   def getParallelism: Int = javaSet match {
     case ds: DataSource[_] => ds.getParallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index cccea78..4c1e627 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -78,9 +78,9 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Sets the degree of parallelism (DOP) for operations executed through this environment.
-   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
-   * x parallel instances. This value can be overridden by specific operations using
+   * Sets the parallelism (parallelism) for operations executed through this environment.
+   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+   * with x parallel instances. This value can be overridden by specific operations using
    * [[DataSet.setParallelism]].
    * @deprecated Please use [[setParallelism]]
    */
@@ -90,8 +90,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
-   * Returns the default degree of parallelism for this execution environment. Note that this
-   * value can be overridden by individual operations using [[DataSet.setParallelism]
+   * Sets the parallelism (parallelism) for operations executed through this environment.
+   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+   * with x parallel instances. This value can be overridden by specific operations using
+   * [[DataSet.setParallelism]].
    */
   def setParallelism(parallelism: Int): Unit = {
     javaEnv.setParallelism(parallelism)
@@ -432,7 +434,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * because the framework may move the elements into the cluster if needed.
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
+   * a parallelism of one.
    */
   def fromCollection[T: ClassTag : TypeInformation](
       data: Seq[T]): DataSet[T] = {
@@ -453,7 +455,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * framework might move into the cluster if needed.
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
+   * a parallelism of one.
    */
   def fromCollection[T: ClassTag : TypeInformation] (
     data: Iterator[T]): DataSet[T] = {
@@ -473,7 +475,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * same type and must be serializable.
    *
    * * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
+   * a parallelism of one.
    */
   def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
     Validate.notNull(data, "Data must not be null.")
@@ -610,10 +612,10 @@ object ExecutionEnvironment {
    * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
    */
   def createLocalEnvironment(
-      degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors())
+      parallelism: Int = Runtime.getRuntime.availableProcessors())
       : ExecutionEnvironment = {
     val javaEnv = JavaEnv.createLocalEnvironment()
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    javaEnv.setParallelism(parallelism)
     new ExecutionEnvironment(javaEnv)
   }
 
@@ -630,8 +632,8 @@ object ExecutionEnvironment {
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
-   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
-   * parallelism is set explicitly via [[ExecutionEnvironment.setDegreeOfParallelism()]].
+   * the cluster. The execution will use the cluster's default parallelism, unless the
+   * parallelism is set explicitly via [[ExecutionEnvironment.setParallelism()]].
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.
@@ -649,12 +651,12 @@ object ExecutionEnvironment {
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program
    * to a cluster for execution. Note that all file paths used in the program must be accessible
-   * from the cluster. The execution will use the specified degree of parallelism.
+   * from the cluster. The execution will use the specified parallelism.
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.
    * @param port The port of the master (JobManager), where the program should be executed.
-   * @param degreeOfParallelism The degree of parallelism to use during the execution.
+   * @param parallelism The parallelism to use during the execution.
    * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
    *                 program uses
    *                 user-defined functions, user-defined input formats, or any libraries,
@@ -664,10 +666,10 @@ object ExecutionEnvironment {
   def createRemoteEnvironment(
       host: String,
       port: Int,
-      degreeOfParallelism: Int,
+      parallelism: Int,
       jarFiles: String*): ExecutionEnvironment = {
     val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    javaEnv.setParallelism(parallelism)
     new ExecutionEnvironment(javaEnv)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 6eaa9ae..c54ee0c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -175,17 +175,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	}
 	
 	/**
-	 * Sets the degree of parallelism for the iteration.
+	 * Sets the parallelism for the iteration.
 	 * 
-	 * @param parallelism The degree of parallelism.
+	 * @param parallelism The parallelism.
 	 */
 	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
 		this.parallelism = parallelism;
 	}
 	
 	/**
-	 * Gets the iteration's degree of parallelism.
+	 * Gets the iteration's parallelism.
 	 * 
 	 * @return The iterations parallelism, or -1, if not set.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 18826b6..6c195cb 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -75,7 +75,7 @@ public class DegreesWithExceptionITCase {
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
 
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
@@ -100,7 +100,7 @@ public class DegreesWithExceptionITCase {
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
 
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -125,7 +125,7 @@ public class DegreesWithExceptionITCase {
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
 
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
@@ -150,7 +150,7 @@ public class DegreesWithExceptionITCase {
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
 
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
@@ -175,7 +175,7 @@ public class DegreesWithExceptionITCase {
 		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
 				"localhost", cluster.getJobManagerRPCPort());
 
-		env.setDegreeOfParallelism(PARALLELISM);
+		env.setParallelism(PARALLELISM);
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index b6650d2..bbb7503 100644
--- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -31,7 +31,7 @@ public class HadoopMapredITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
-		this.setDegreeOfParallelism(4);
+		this.setParallelism(4);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 7eee629..9b4aeea 100644
--- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -32,7 +32,7 @@ public class HadoopInputOutputITCase extends JavaProgramTestBase {
 	protected void preSubmit() throws Exception {
 		textPath = createTempFile("text.txt", WordCountData.TEXT);
 		resultPath = getTempDirPath("result");
-		this.setDegreeOfParallelism(4);
+		this.setParallelism(4);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
index 28fdfa6..95e2a25 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
@@ -33,7 +33,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
   def testMapElementToPolynomialVectorSpace (): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism (2)
+    env.setParallelism (2)
 
     val input = Seq (
     LabeledVector (DenseVector (1), 1.0),
@@ -64,7 +64,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
   def testMapVectorToPolynomialVectorSpace(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism(2)
+    env.setParallelism(2)
 
     val input = Seq(
       LabeledVector(DenseVector(2, 3), 1.0),
@@ -96,7 +96,7 @@ class PolynomialBaseITCase extends ShouldMatchers {
   def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism(2)
+    env.setParallelism(2)
 
     val input = Seq(
       LabeledVector(DenseVector(2, 3), 1.0),

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
index 6e324cb..d783ecb 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
@@ -34,7 +34,7 @@ class ALSITCase extends ShouldMatchers {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism(2)
+    env.setParallelism(2)
 
     val als = ALS()
       .setIterations(iterations)

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
index eb825b9..15292b7 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
@@ -33,7 +33,7 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
   def testEstimationOfLinearFunction(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism(2)
+    env.setParallelism(2)
 
     val learner = MultipleLinearRegression()
 
@@ -69,7 +69,7 @@ class MultipleLinearRegressionITCase extends ShouldMatchers {
   def testEstimationOfCubicFunction(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setDegreeOfParallelism(2)
+    env.setParallelism(2)
 
     val polynomialBase = PolynomialBase()
     val learner = MultipleLinearRegression()

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
index 4f84467..d054975 100644
--- a/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
+++ b/flink-staging/flink-spargel/src/main/java/org/apache/flink/spargel/java/VertexCentricIteration.java
@@ -217,17 +217,17 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, Ver
 	}
 	
 	/**
-	 * Sets the degree of parallelism for the iteration.
+	 * Sets the parallelism for the iteration.
 	 * 
-	 * @param parallelism The degree of parallelism.
+	 * @param parallelism The parallelism.
 	 */
 	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The degree of parallelism must be positive, or -1 (use default).");
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
 		this.parallelism = parallelism;
 	}
 	
 	/**
-	 * Gets the iteration's degree of parallelism.
+	 * Gets the iteration's parallelism.
 	 * 
 	 * @return The iterations parallelism, or -1, if not set.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 019345f..38c26f0 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -48,7 +48,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 	public void testSpargelCompiler() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 			// compose test program
 			{
 				DataSet<Long> vertexIds = env.generateSequence(1, 2);
@@ -116,7 +116,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 			
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+			env.setParallelism(DEFAULT_PARALLELISM);
 			// compose test program
 			{
 				DataSet<Long> bcVar = env.fromElements(1L);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
index b31618c..cd48dcd 100644
--- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
+++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelTranslationTest.java
@@ -49,7 +49,7 @@ public class SpargelTranslationTest {
 			;
 			final int NUM_ITERATIONS = 13;
 			
-			final int ITERATION_DOP = 77;
+			final int ITERATION_parallelism = 77;
 			
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -75,7 +75,7 @@ public class SpargelTranslationTest {
 				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcUpdate);
 				
 				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_DOP);
+				vertexIteration.setParallelism(ITERATION_parallelism);
 				
 				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 				
@@ -93,7 +93,7 @@ public class SpargelTranslationTest {
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
 			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
 			assertEquals(ITERATION_NAME, iteration.getName());
 			
 			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
@@ -129,7 +129,7 @@ public class SpargelTranslationTest {
 			;
 			final int NUM_ITERATIONS = 13;
 			
-			final int ITERATION_DOP = 77;
+			final int ITERATION_parallelism = 77;
 			
 			
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -154,7 +154,7 @@ public class SpargelTranslationTest {
 				vertexIteration.addBroadcastSetForUpdateFunction(BC_SET_UPDATES_NAME, bcVar);
 				
 				vertexIteration.setName(ITERATION_NAME);
-				vertexIteration.setParallelism(ITERATION_DOP);
+				vertexIteration.setParallelism(ITERATION_parallelism);
 				
 				vertexIteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 				
@@ -172,7 +172,7 @@ public class SpargelTranslationTest {
 			// check the basic iteration properties
 			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
 			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_DOP, iteration.getParallelism());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
 			assertEquals(ITERATION_NAME, iteration.getName());
 			
 			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 6a95b0c..1bf2962 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -34,7 +34,7 @@ public class KafkaConsumerExample {
 			return;
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
 
 		DataStream<String> kafkaStream = env
 				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index e7abf11..b1e0f0b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -36,7 +36,7 @@ public class KafkaProducerExample {
 			return;
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
 
 		@SuppressWarnings({ "unused", "serial" })
 		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 2fd31ed..d43460b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -34,7 +34,7 @@ public class KafkaSimpleConsumerExample {
 			return;
 		}
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
 		DataStream<String> kafkaStream = env
 				.addSource(new PersistentKafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index a094b89..95609f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -183,7 +183,7 @@ public class KafkaITCase {
 		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, TOPIC, new JavaDefaultStringSchema()));
 
 		try {
-			env.setDegreeOfParallelism(1);
+			env.setParallelism(1);
 			env.execute();
 		} catch (JobExecutionException good) {
 			Throwable t = good.getCause();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index a6eade8..a9ae77a 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -404,7 +404,7 @@ public class ConnectedDataStream<IN1, IN2> {
 
 		dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable,
 				getInputType1(), getInputType2(), outTypeInfo, functionName,
-				environment.getDegreeOfParallelism());
+				environment.getParallelism());
 
 		dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
 		dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 5768101..59ef108 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -103,7 +103,7 @@ public class DataStream<OUT> {
 	protected final StreamExecutionEnvironment environment;
 	protected final Integer id;
 	protected final String type;
-	protected int degreeOfParallelism;
+	protected int parallelism;
 	protected List<String> userDefinedNames;
 	protected StreamPartitioner<OUT> partitioner;
 	@SuppressWarnings("rawtypes")
@@ -133,7 +133,7 @@ public class DataStream<OUT> {
 		this.id = counter;
 		this.type = operatorType;
 		this.environment = environment;
-		this.degreeOfParallelism = environment.getDegreeOfParallelism();
+		this.parallelism = environment.getParallelism();
 		this.streamGraph = environment.getStreamGraph();
 		this.userDefinedNames = new ArrayList<String>();
 		this.partitioner = new DistributePartitioner<OUT>(true);
@@ -152,7 +152,7 @@ public class DataStream<OUT> {
 		this.environment = dataStream.environment;
 		this.id = dataStream.id;
 		this.type = dataStream.type;
-		this.degreeOfParallelism = dataStream.degreeOfParallelism;
+		this.parallelism = dataStream.parallelism;
 		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
 		this.partitioner = dataStream.partitioner;
 		this.streamGraph = dataStream.streamGraph;
@@ -177,12 +177,12 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Gets the degree of parallelism for this operator.
+	 * Gets the parallelism for this operator.
 	 * 
 	 * @return The parallelism set for this operator.
 	 */
 	public int getParallelism() {
-		return this.degreeOfParallelism;
+		return this.parallelism;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 0dda976..b8e0a7d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -41,11 +41,11 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS
 	}
 
 	@Override
-	public DataStreamSource<OUT> setParallelism(int dop) {
-		if (dop > 1 && !isParallel) {
+	public DataStreamSource<OUT> setParallelism(int parallelism) {
+		if (parallelism > 1 && !isParallel) {
 			throw new IllegalArgumentException("Source: " + this.id + " is not a parallel source");
 		} else {
-			return (DataStreamSource<OUT>) super.setParallelism(dop);
+			return (DataStreamSource<OUT>) super.setParallelism(parallelism);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 7832777..2ac60fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -216,7 +216,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 			return out;
 		} else if (transformation == WindowTransformation.REDUCEWINDOW
 				&& parallelism != discretizedStream.getExecutionEnvironment()
-						.getDegreeOfParallelism()) {
+						.getParallelism()) {
 			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
 					new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 16284d4..8ffe1fc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -62,20 +62,20 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	/**
-	 * Sets the degree of parallelism for this operator. The degree must be 1 or
+	 * Sets the parallelism for this operator. The degree must be 1 or
 	 * more.
 	 * 
-	 * @param dop
-	 *            The degree of parallelism for this operator.
-	 * @return The operator with set degree of parallelism.
+	 * @param parallelism
+	 *            The parallelism for this operator.
+	 * @return The operator with set parallelism.
 	 */
-	public SingleOutputStreamOperator<OUT, O> setParallelism(int dop) {
-		if (dop < 1) {
+	public SingleOutputStreamOperator<OUT, O> setParallelism(int parallelism) {
+		if (parallelism < 1) {
 			throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
 		}
-		this.degreeOfParallelism = dop;
+		this.parallelism = parallelism;
 
-		streamGraph.setParallelism(id, degreeOfParallelism);
+		streamGraph.setParallelism(id, parallelism);
 
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index efbbda9..784d20c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -394,7 +394,7 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Returns the degree of parallelism for the stream discretizer. The
+	 * Returns the parallelism for the stream discretizer. The
 	 * returned parallelism is either 1 for for non-parallel global policies (or
 	 * when the input stream is non-parallel), environment parallelism for the
 	 * policies that can run in parallel (such as, any ditributed policy, reduce
@@ -408,7 +408,7 @@ public class WindowedDataStream<OUT> {
 		return isLocal
 				|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
 						.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
-				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+				|| (discretizerKey != null) ? dataStream.environment.getParallelism() : 1;
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 4824fca..e1b1453 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -29,7 +29,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 */
 	@Override
 	public void execute() throws Exception {
-		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getDegreeOfParallelism());
+		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
 	}
 
 	/**
@@ -42,6 +42,6 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	@Override
 	public void execute(String jobName) throws Exception {
 		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
-				getDegreeOfParallelism());
+				getParallelism());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 2eb05ad..3142bdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -120,8 +120,8 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public String toString() {
-		return "Remote Environment (" + this.host + ":" + this.port + " - DOP = "
-				+ (getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism()) + ")";
+		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
+				+ (getParallelism() == -1 ? "default" : getParallelism()) + ")";
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 7d41d2a..7ae78f1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -32,15 +32,15 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	protected List<File> jars;
 	protected Client client;
 
-	protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+	protected StreamContextEnvironment(Client client, List<File> jars, int parallelism) {
 		this.client = client;
 		this.jars = jars;
-		if (dop > 0) {
-			setDegreeOfParallelism(dop);
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		} else {
-			setDegreeOfParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+			setParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dc1826b..9f2ccff 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -64,7 +64,7 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
  */
 public abstract class StreamExecutionEnvironment {
 
-	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
+	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
 
 	private long bufferTimeout = 100;
 
@@ -93,9 +93,9 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Gets the degree of parallelism with which operation are executed by
+	 * Gets the parallelism with which operation are executed by
 	 * default. Operations can individually override this value to use a
-	 * specific degree of parallelism.
+	 * specific parallelism.
 	 * 
 	 * @return The parallelism used by operations, unless they
 	 *         override that value.
@@ -252,12 +252,12 @@ public abstract class StreamExecutionEnvironment {
 	 * Sets the default parallelism that will be used for the local execution
 	 * environment created by {@link #createLocalEnvironment()}.
 	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism to use as the default local
+	 * @param parallelism
+	 *            The parallelism to use as the default local
 	 *            parallelism.
 	 */
-	public static void setDefaultLocalParallelism(int degreeOfParallelism) {
-		defaultLocalDop = degreeOfParallelism;
+	public static void setDefaultLocalParallelism(int parallelism) {
+		defaultLocalParallelism = parallelism;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -617,7 +617,7 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		boolean isParallel = function instanceof ParallelSourceFunction;
-		int dop = isParallel ? getDegreeOfParallelism() : 1;
+		int parallelism = isParallel ? getParallelism() : 1;
 
 		ClosureCleaner.clean(function, true);
 		StreamInvokable<OUT, OUT> sourceInvokable = new SourceInvokable<OUT>(function);
@@ -626,7 +626,7 @@ public abstract class StreamExecutionEnvironment {
 				outTypeInfo, sourceInvokable, isParallel);
 
 		streamGraph.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo,
-				sourceName, dop);
+				sourceName, parallelism);
 
 		return returnStream;
 	}
@@ -652,7 +652,7 @@ public abstract class StreamExecutionEnvironment {
 		if (env instanceof ContextEnvironment) {
 			ContextEnvironment ctx = (ContextEnvironment) env;
 			currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(),
-					ctx.getDegreeOfParallelism());
+					ctx.getParallelism());
 		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
 			currentEnvironment = new StreamPlanEnvironment(env);
 		} else {
@@ -662,38 +662,38 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	private static StreamExecutionEnvironment createContextEnvironment(Client client,
-			List<File> jars, int dop) {
-		return new StreamContextEnvironment(client, jars, dop);
+			List<File> jars, int parallelism) {
+		return new StreamContextEnvironment(client, jars, parallelism);
 	}
 
 	/**
 	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
 	 * will run the program in a multi-threaded fashion in the same JVM as the
-	 * environment was created in. The default degree of parallelism of the
+	 * environment was created in. The default parallelism of the
 	 * local environment is the number of hardware contexts (CPU cores /
 	 * threads), unless it was specified differently by
-	 * {@link #setDegreeOfParallelism(int)}.
+	 * {@link #setParallelism(int)}.
 	 * 
 	 * @return A local execution environment.
 	 */
 	public static LocalStreamEnvironment createLocalEnvironment() {
-		return createLocalEnvironment(defaultLocalDop);
+		return createLocalEnvironment(defaultLocalParallelism);
 	}
 
 	/**
 	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
 	 * will run the program in a multi-threaded fashion in the same JVM as the
-	 * environment was created in. It will use the degree of parallelism
+	 * environment was created in. It will use the parallelism
 	 * specified in the parameter.
 	 * 
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism for the local environment.
-	 * @return A local execution environment with the specified degree of
+	 * @param parallelism
+	 *            The parallelism for the local environment.
+	 * @return A local execution environment with the specified
 	 *         parallelism.
 	 */
-	public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
+	public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
 		currentEnvironment = new LocalStreamEnvironment();
-		currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+		currentEnvironment.setParallelism(parallelism);
 		return (LocalStreamEnvironment) currentEnvironment;
 	}
 
@@ -703,7 +703,7 @@ public abstract class StreamExecutionEnvironment {
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The
 	 * execution will use no parallelism, unless the parallelism is set
-	 * explicitly via {@link #setDegreeOfParallelism}.
+	 * explicitly via {@link #setParallelism}.
 	 * 
 	 * @param host
 	 *            The host name or address of the master (JobManager), where the
@@ -728,7 +728,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
 	 * (parts of) the program to a cluster for execution. Note that all file
 	 * paths used in the program must be accessible from the cluster. The
-	 * execution will use the specified degree of parallelism.
+	 * execution will use the specified parallelism.
 	 * 
 	 * @param host
 	 *            The host name or address of the master (JobManager), where the
@@ -736,8 +736,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @param port
 	 *            The port of the master (JobManager), where the program should
 	 *            be executed.
-	 * @param degreeOfParallelism
-	 *            The degree of parallelism to use during the execution.
+	 * @param parallelism
+	 *            The parallelism to use during the execution.
 	 * @param jarFiles
 	 *            The JAR files with code that needs to be shipped to the
 	 *            cluster. If the program uses user-defined functions,
@@ -746,9 +746,9 @@ public abstract class StreamExecutionEnvironment {
 	 * @return A remote environment that executes the program on a cluster.
 	 */
 	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
-			int degreeOfParallelism, String... jarFiles) {
+			int parallelism, String... jarFiles) {
 		currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
-		currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+		currentEnvironment.setParallelism(parallelism);
 		return currentEnvironment;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 1cff7e7..2cf5cc2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,13 +32,13 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 		super();
 		this.env = env;
 
-		int dop = env.getDegreeOfParallelism();
-		if (dop > 0) {
-			setDegreeOfParallelism(dop);
+		int parallelism = env.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		} else {
-			setDegreeOfParallelism(GlobalConfiguration.getInteger(
-					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+			setParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELISM_KEY,
+					ConfigConstants.DEFAULT_PARALLELISM));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index 9ff8a7f..947f8ab 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -68,7 +68,7 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 		// get the target stream
 		stream = target == STD_OUT ? System.out : System.err;
 		
-		// set the prefix if we have a >1 DOP
+		// set the prefix if we have a >1 parallelism
 		prefix = (context.getNumberOfParallelSubtasks() > 1) ? 
 				((context.getIndexOfThisSubtask() + 1) + "> ") : null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index ec8c226..0a423cc 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
  * Interface for a stream data source.
  *
  * <p>Sources implementing this specific interface are executed with
- * degree of parallelism 1. To execute your sources in parallel
+ * parallelism 1. To execute your sources in parallel
  * see {@link ParallelSourceFunction}.</p>
  *
  * @param <OUT> The type of the records produced by this source.

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 691b111..d04e7e6 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -37,12 +37,12 @@ public class ClusterUtil {
 	 * 
 	 * @param jobGraph
 	 *            jobGraph
-	 * @param degreeOfParallelism
+	 * @param parallelism
 	 *            numberOfTaskTrackers
 	 * @param memorySize
 	 *            memorySize
 	 */
-	public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism, long memorySize)
+	public static void runOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize)
 			throws Exception {
 
 		Configuration configuration = jobGraph.getJobConfiguration();
@@ -50,7 +50,7 @@ public class ClusterUtil {
 		LocalFlinkMiniCluster exec = null;
 
 		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, degreeOfParallelism);
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running on mini cluster");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
index d853846..0446b61 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
@@ -27,11 +27,11 @@ import org.junit.Test;
 
 public abstract class StreamingProgramTestBase extends AbstractTestBase {
 
-	private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
+	private static final int DEFAULT_PARALLELISM = 4;
 
 	private JobExecutionResult latestExecutionResult;
 
-	private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+	private int parallelism = DEFAULT_PARALLELISM;
 
 
 	public StreamingProgramTestBase() {
@@ -40,16 +40,16 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 
 	public StreamingProgramTestBase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(degreeOfParallelism);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
-	public void setDegreeOfParallelism(int degreeOfParallelism) {
-		this.degreeOfParallelism = degreeOfParallelism;
-		setTaskManagerNumSlots(degreeOfParallelism);
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		setTaskManagerNumSlots(parallelism);
 	}
 	
-	public int getDegreeOfParallelism() {
-		return degreeOfParallelism;
+	public int getParallelism() {
+		return parallelism;
 	}
 	
 	public JobExecutionResult getLatestExecutionResult() {
@@ -86,7 +86,7 @@ public abstract class StreamingProgramTestBase extends AbstractTestBase {
 			}
 
 			// prepare the test environment
-			TestStreamEnvironment env = new TestStreamEnvironment(this.executor, this.degreeOfParallelism);
+			TestStreamEnvironment env = new TestStreamEnvironment(this.executor, this.parallelism);
 			env.setAsContext();
 
 			// call the test program

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 6e0821d..5e785f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -38,15 +38,15 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	private ForkableFlinkMiniCluster executor;
 	private boolean internalExecutor;
 
-	public TestStreamEnvironment(int degreeOfParallelism, long memorySize){
-		setDegreeOfParallelism(degreeOfParallelism);
+	public TestStreamEnvironment(int parallelism, long memorySize){
+		setParallelism(parallelism);
 		this.memorySize = memorySize;
 		internalExecutor = true;
 	}
 
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int dop){
+	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism){
 		this.executor = executor;
-		setDefaultLocalParallelism(dop);
+		setDefaultLocalParallelism(parallelism);
 	}
 
 	@Override
@@ -62,7 +62,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 			Configuration configuration = jobGraph.getJobConfiguration();
 
 			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
-					getDegreeOfParallelism());
+					getParallelism());
 			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
 
 			executor = new ForkableFlinkMiniCluster(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 59f86b8..3db6531 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -57,11 +57,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
   def getJavaStream: JavaStream[T] = javaStream
 
   /**
-   * Sets the degree of parallelism of this operation. This must be greater than 1.
+   * Sets the parallelism of this operation. This must be greater than 1.
    */
-  def setParallelism(dop: Int): DataStream[T] = {
+  def setParallelism(parallelism: Int): DataStream[T] = {
     javaStream match {
-      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
+      case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
       case _ =>
         throw new UnsupportedOperationException("Operator " + javaStream.toString +  " cannot " +
           "have " +
@@ -71,7 +71,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
   }
 
   /**
-   * Returns the degree of parallelism of this operation.
+   * Returns the parallelism of this operation.
    */
   def getParallelism: Int = javaStream match {
     case op: SingleOutputStreamOperator[_, _] => op.getParallelism

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 598b590..cfa7c18 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -33,9 +33,9 @@ import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.Wat
 class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
-   * Sets the degree of parallelism (DOP) for operations executed through this environment.
-   * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
-   * x parallel instances. This value can be overridden by specific operations using
+   * Sets the parallelism for operations executed through this environment.
+   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
+   * with x parallel instances. This value can be overridden by specific operations using
    * [[DataStream.setParallelism]].
    * @deprecated Please use [[setParallelism]]
    */
@@ -239,7 +239,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * same type and must be serializable.
    *
    * * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
+   * a parallelism of one.
    */
   def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
     val typeInfo = implicitly[TypeInformation[T]]
@@ -251,7 +251,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * because the framework may move the elements into the cluster if needed.
    *
    * Note that this operation will result in a non-parallel data source, i.e. a data source with
-   * a degree of parallelism of one.
+   * a parallelism of one.
    */
   def fromCollection[T: ClassTag: TypeInformation](
     data: Seq[T]): DataStream[T] = {
@@ -352,16 +352,16 @@ object StreamExecutionEnvironment {
    * of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
    */
   def createLocalEnvironment(
-    degreeOfParallelism: Int =  Runtime.getRuntime.availableProcessors()):
+    parallelism: Int =  Runtime.getRuntime.availableProcessors()):
   StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program to
    * a cluster for execution. Note that all file paths used in the program must be accessible from
-   * the cluster. The execution will use the cluster's default degree of parallelism, unless the
-   * parallelism is set explicitly via [[StreamExecutionEnvironment.setDegreeOfParallelism()]].
+   * the cluster. The execution will use the cluster's default parallelism, unless the
+   * parallelism is set explicitly via [[StreamExecutionEnvironment.setParallelism()]].
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.
@@ -380,12 +380,12 @@ object StreamExecutionEnvironment {
   /**
    * Creates a remote execution environment. The remote environment sends (parts of) the program
    * to a cluster for execution. Note that all file paths used in the program must be accessible
-   * from the cluster. The execution will use the specified degree of parallelism.
+   * from the cluster. The execution will use the specified parallelism.
    *
    * @param host The host name or address of the master (JobManager),
    *             where the program should be executed.
    * @param port The port of the master (JobManager), where the program should be executed.
-   * @param degreeOfParallelism The degree of parallelism to use during the execution.
+   * @param parallelism The parallelism to use during the execution.
    * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
    *                 program uses
    *                 user-defined functions, user-defined input formats, or any libraries,
@@ -395,10 +395,10 @@ object StreamExecutionEnvironment {
   def createRemoteEnvironment(
     host: String,
     port: Int,
-    degreeOfParallelism: Int,
+    parallelism: Int,
     jarFiles: String*): StreamExecutionEnvironment = {
     val javaEnv = JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*)
-    javaEnv.setDegreeOfParallelism(degreeOfParallelism)
+    javaEnv.setParallelism(parallelism)
     new StreamExecutionEnvironment(javaEnv)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 9396b66..fd328ae 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -41,7 +41,7 @@ import java.io.StringWriter;
 
 /**
  * This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here.
+ * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
  */
 public class HDFSTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
index e1235b6..3b2fb7f 100644
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
@@ -128,7 +128,7 @@ public class TachyonFileSystemWrapperTest {
 			addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
 			GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
 
-			new DopOneTestEnvironment(); // initialize DOP one
+			new DopOneTestEnvironment(); // initialize parallelism one
 
 			WordCount.main(new String[]{input, output});
 
@@ -157,7 +157,7 @@ public class TachyonFileSystemWrapperTest {
 				@Override
 				public ExecutionEnvironment createExecutionEnvironment() {
 					LocalEnvironment le = new LocalEnvironment();
-					le.setDegreeOfParallelism(1);
+					le.setParallelism(1);
 					return le;
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
index 788327a..435713b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
@@ -69,10 +69,10 @@ public abstract class CompilerTestBase {
 	public void setup() {
 		this.dataStats = new DataStatistics();
 		this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator());
-		this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+		this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
 		
 		this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator());
-		this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM);
+		this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index 6e86896..2214000 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -29,11 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple;
 
 public abstract class JavaProgramTestBase extends AbstractTestBase {
 
-	private static final int DEFAULT_DEGREE_OF_PARALLELISM = 4;
+	private static final int DEFAULT_PARALLELISM = 4;
 	
 	private JobExecutionResult latestExecutionResult;
 	
-	private int degreeOfParallelism = DEFAULT_DEGREE_OF_PARALLELISM;
+	private int parallelism = DEFAULT_PARALLELISM;
 
 	/**
 	 * The number of times a test should be repeated.
@@ -42,7 +42,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	 * tests repeatedly might help to discover resource leaks, race conditions etc.
 	 */
 	private int numberOfTestRepetitions = 1;
-	
+
 	private boolean isCollectionExecution;
 
 	public JavaProgramTestBase() {
@@ -51,20 +51,20 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	public JavaProgramTestBase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(degreeOfParallelism);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
-	public void setDegreeOfParallelism(int degreeOfParallelism) {
-		this.degreeOfParallelism = degreeOfParallelism;
-		setTaskManagerNumSlots(degreeOfParallelism);
+	public void setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		setTaskManagerNumSlots(parallelism);
 	}
 
 	public void setNumberOfTestRepetitions(int numberOfTestRepetitions) {
 		this.numberOfTestRepetitions = numberOfTestRepetitions;
 	}
 	
-	public int getDegreeOfParallelism() {
-		return isCollectionExecution ? 1 : degreeOfParallelism;
+	public int getParallelism() {
+		return isCollectionExecution ? 1 : parallelism;
 	}
 	
 	public JobExecutionResult getLatestExecutionResult() {
@@ -110,7 +110,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			}
 			
 			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
 			env.getConfig().enableObjectReuse();
 			env.setAsContext();
 
@@ -162,7 +162,7 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 			}
 
 			// prepare the test environment
-			TestEnvironment env = new TestEnvironment(this.executor, this.degreeOfParallelism);
+			TestEnvironment env = new TestEnvironment(this.executor, this.parallelism);
 			env.getConfig().disableObjectReuse();
 			env.setAsContext();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index 67a4797..eafe9ad 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -35,7 +35,7 @@ import org.junit.Test;
 
 public abstract class RecordAPITestBase extends AbstractTestBase {
 
-	protected static final int DOP = 4;
+	protected static final int parallelism = 4;
 	
 	protected JobExecutionResult jobExecutionResult;
 	
@@ -48,7 +48,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase {
 	
 	public RecordAPITestBase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 44f35e7..02c1434 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -39,9 +39,9 @@ public class TestEnvironment extends ExecutionEnvironment {
 	protected JobExecutionResult latestResult;
 
 
-	public TestEnvironment(ForkableFlinkMiniCluster executor, int degreeOfParallelism) {
+	public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
 		this.executor = executor;
-		setDegreeOfParallelism(degreeOfParallelism);
+		setParallelism(parallelism);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
index cd38418..1d7d206 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorITCase.java
@@ -83,7 +83,7 @@ public class AccumulatorITCase extends JavaProgramTestBase {
 		
 		Assert.assertEquals(Integer.valueOf(3), (Integer) res.getAccumulatorResult("num-lines"));
 
-		Assert.assertEquals(Double.valueOf(getDegreeOfParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
+		Assert.assertEquals(Double.valueOf(getParallelism()), (Double)res.getAccumulatorResult("open-close-counter"));
 		
 		// Test histogram (words per line distribution)
 		Map<Integer, Integer> dist = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
index daf5181..4868aff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastBranchingITCase.java
@@ -95,7 +95,7 @@ public class BroadcastBranchingITCase extends RecordAPITestBase {
 		MapOperator mp2 = MapOperator.builder(Mp2.class).setBroadcastVariable("z", mp1).input(jn2).build();
 
 		FileDataSink output = new FileDataSink(new ContractITCaseOutputFormat(), resultPath);
-		output.setDegreeOfParallelism(1);
+		output.setParallelism(1);
 		output.setInput(mp2);
 
 		return new Plan(output);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
index 06c7506..41d24b8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarInitializationITCase.java
@@ -40,7 +40,7 @@ public class BroadcastVarInitializationITCase extends JavaProgramTestBase {
 	protected void testProgram() throws Exception {
 		
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(4);
+		env.setParallelism(4);
 		
 		DataSet<Integer> data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
index e9374b1..34d4133 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java
@@ -67,7 +67,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 
 	private static final int NUM_FEATURES = 3;
 
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
 	protected String pointsPath;
 
@@ -76,7 +76,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 	protected String resultPath;
 
 	public BroadcastVarsNepheleITCase(){
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 
@@ -131,7 +131,7 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, DOP);
+		return createJobGraphV1(this.pointsPath, this.modelsPath, this.resultPath, parallelism);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
index 8e13408..2406d6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java
@@ -60,9 +60,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 	
 	private static final int MEMORY_PER_CONSUMER = 2;
 
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
-	private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*DOP;
+	private static final double MEMORY_FRACTION_PER_CONSUMER = (double)MEMORY_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
 
 	protected String dataPath;
 	protected String clusterPath;
@@ -70,7 +70,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 
 	
 	public KMeansIterativeNepheleITCase() {
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 	@Override
@@ -87,7 +87,7 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 
 	@Override
 	protected JobGraph getJobGraph() throws Exception {
-		return createJobGraph(dataPath, clusterPath, this.resultPath, DOP, 20);
+		return createJobGraph(dataPath, clusterPath, this.resultPath, parallelism, 20);
 	}
 
 	// -------------------------------------------------------------------------------------------------------------
@@ -252,8 +252,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase {
 		return tail;
 	}
 	
-	private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int dop) {
-		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, dop);
+	private static AbstractJobVertex createSync(JobGraph jobGraph, int numIterations, int parallelism) {
+		AbstractJobVertex sync = JobGraphUtils.createSync(jobGraph, parallelism);
 		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
 		syncConfig.setNumberOfIterations(numIterations);
 		syncConfig.setIterationId(ITERATION_ID);

http://git-wip-us.apache.org/repos/asf/flink/blob/cf84bca1/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
index 9bbd282..36db34d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/MapCancelingITCase.java
@@ -33,10 +33,10 @@ import org.apache.flink.util.Collector;
 
 @SuppressWarnings("deprecation")
 public class MapCancelingITCase extends CancellingTestBase {
-	private static final int DOP = 4;
+	private static final int parallelism = 4;
 
 	public MapCancelingITCase() {
-		setTaskManagerNumSlots(DOP);
+		setTaskManagerNumSlots(parallelism);
 	}
 	
 //	@Test
@@ -51,7 +51,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 5 * 1000, 10 * 1000);
 	}
@@ -68,7 +68,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 5 * 1000, 10 * 1000);
 	}
@@ -85,7 +85,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}
@@ -102,7 +102,7 @@ public class MapCancelingITCase extends CancellingTestBase {
 		
 		
 		Plan p = new Plan(sink);
-		p.setDefaultParallelism(DOP);
+		p.setDefaultParallelism(parallelism);
 		
 		runAndCancelJob(p, 10 * 1000, 10 * 1000);
 	}


Mime
View raw message