flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-1687] [streaming] Syncing streaming sou...
Date Wed, 20 May 2015 09:24:33 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/521#discussion_r30683935
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---
    @@ -19,396 +19,867 @@
     package org.apache.flink.streaming.api.scala
     
     import com.esotericsoftware.kryo.Serializer
    +import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
     import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
    +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, TypeExtractor}
     import org.apache.flink.api.scala.ClosureCleaner
    +import org.apache.flink.streaming.api.datastream.DataStreamSource
     import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
     import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
    -import org.apache.flink.streaming.api.functions.source.{FromElementsFunction, SourceFunction}
    -import org.apache.flink.util.Collector
    +import org.apache.flink.streaming.api.functions.source._
    +import org.apache.flink.streaming.api.operators.StreamSource
    +import org.apache.flink.types.StringValue
    +import org.apache.flink.util.{Collector, SplittableIterator}
     
     import scala.reflect.ClassTag
     
     class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     
    -  /**
    -   * Sets the parallelism for operations executed through this environment.
    -   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    -   * with x parallel instances. This value can be overridden by specific operations using
    -   * [[DataStream.setParallelism]].
    -   * @deprecated Please use [[setParallelism]]
    -   */
    -  @deprecated
    -  def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
    -    javaEnv.setParallelism(degreeOfParallelism)
    -  }
    -
    -  /**
    -   * Sets the parallelism for operations executed through this environment.
    -   * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    -   * with x parallel instances. This value can be overridden by specific operations using
    -   * [[DataStream.setParallelism]].
    -   */
    -  def setParallelism(parallelism: Int): Unit = {
    -    javaEnv.setParallelism(parallelism)
    -  }
    -
    -  /**
    -   * Returns the default parallelism for this execution environment. Note that this
    -   * value can be overridden by individual operations using [[DataStream.setParallelism]]
    -   * @deprecated Please use [[getParallelism]]
    -   */
    -  @deprecated
    -  def getDegreeOfParallelism = javaEnv.getParallelism
    -
    -  /**
    -   * Returns the default parallelism for this execution environment. Note that this
    -   * value can be overridden by individual operations using [[DataStream.setParallelism]]
    -   */
    -  def getParallelism = javaEnv.getParallelism
    -
    -  /**
    -   * Sets the maximum time frequency (milliseconds) for the flushing of the
    -   * output buffers. By default the output buffers flush frequently to provide
    -   * low latency and to aid smooth developer experience. Setting the parameter
    -   * can result in three logical modes:
    -   *
    -   * <ul>
    -   * <li>
    -   * A positive integer triggers flushing periodically by that integer</li>
    -   * <li>
    -   * 0 triggers flushing after every record thus minimizing latency</li>
    -   * <li>
    -   * -1 triggers flushing only when the output buffer is full thus maximizing
    -   * throughput</li>
    -   * </ul>
    -   *
    -   */
    -  def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
    -    javaEnv.setBufferTimeout(timeoutMillis)
    -    this
    -  }
    -
    -  /**
    -   * Gets the default buffer timeout set for this environment
    -   */
    -  def getBufferTimout: Long = javaEnv.getBufferTimeout()
    -
    -  /**
    -   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
    -   * operator states. Time interval between state checkpoints is specified in in millis.
    -   *
    -   * Setting this option assumes that the job is used in production and thus if not stated
    -   * explicitly otherwise with calling with the
    -   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    -   * failure the job will be resubmitted to the cluster indefinitely.
    -   */
    -  def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
    -    javaEnv.enableCheckpointing(interval)
    -    this
    -  }
    -
    -  /**
    -   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
    -   * operator states. Time interval between state checkpoints is specified in in millis.
    -   *
    -   * Setting this option assumes that the job is used in production and thus if not stated
    -   * explicitly otherwise with calling with the
    -   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    -   * failure the job will be resubmitted to the cluster indefinitely.
    -   */
    -  def enableCheckpointing() : StreamExecutionEnvironment = {
    -    javaEnv.enableCheckpointing()
    -    this
    -  }
    -  
    -  /**
    -   * Disables operator chaining for streaming operators. Operator chaining
    -   * allows non-shuffle operations to be co-located in the same thread fully
    -   * avoiding serialization and de-serialization.
    -   * 
    -   */
    -  def disableOperatorChaning(): StreamExecutionEnvironment = {
    -    javaEnv.disableOperatorChaning()
    -    this
    -  }
    -
    -  /**
    -   * Sets the number of times that failed tasks are re-executed. A value of zero
    -   * effectively disables fault tolerance. A value of "-1" indicates that the system
    -   * default value (as defined in the configuration) should be used.
    -   */
    -  def setNumberOfExecutionRetries(numRetries: Int): Unit = {
    -    javaEnv.setNumberOfExecutionRetries(numRetries)
    -  }
    -
    -  /**
    -   * Gets the number of times the system will try to re-execute failed tasks. A value
    -   * of "-1" indicates that the system default value (as defined in the configuration)
    -   * should be used.
    -   */
    -  def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
    -
    -
    -  /**
    -   * Registers the given type with the serializer at the [[KryoSerializer]].
    -   *
    -   * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    -   * because it may be distributed to the worker nodes by java serialization.
    -   */
    -  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
    -    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers the given type with the serializer at the [[KryoSerializer]].
    -   */
    -  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
    -    javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    -  }
    -
    -
    -  /**
    -   * Registers a default serializer for the given class and its sub-classes at Kryo.
    -   */
    -  def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
    -    javaEnv.addDefaultKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers a default serializer for the given class and its sub-classes at Kryo.
    -   *
    -   * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    -   * because it may be distributed to the worker nodes by java serialization.
    -   */
    -  def registerDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
    -    javaEnv.addDefaultKryoSerializer(clazz, serializer)
    -  }
    -
    -  /**
    -   * Registers the given type with the serialization stack. If the type is eventually
    -   * serialized as a POJO, then the type is registered with the POJO serializer. If the
    -   * type ends up being serialized with Kryo, then it will be registered at Kryo to make
    -   * sure that only tags are written.
    -   *
    -   */
    -  def registerType(typeClass: Class[_]) {
    -    javaEnv.registerType(typeClass)
    -  }
    -
    -  /**
    -   * Creates a DataStream that represents the Strings produced by reading the
    -   * given file line wise. The file will be read with the system's default
    -   * character set.
    -   *
    -   */
    -  def readTextFile(filePath: String): DataStream[String] =
    -    javaEnv.readTextFile(filePath)
    -
    -  /**
    -   * Creates a DataStream that contains the contents of file created while
    -   * system watches the given path. The file will be read with the system's
    -   * default character set. The user can check the monitoring interval in milliseconds,
    -   * and the way file modifications are handled. By default it checks for only new files
    -   * every 100 milliseconds.
    -   *
    -   */
    -  def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = 
    -    WatchType.ONLY_NEW_FILES): DataStream[String] =
    -    javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
    -
    -  /**
    -   * Creates a new DataStream that contains the strings received infinitely
    -   * from socket. Received strings are decoded by the system's default
    -   * character set.
    -   *
    -   */
    -  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
    -    javaEnv.socketTextStream(hostname, port, delimiter)
    -
    -  /**
    -   * Creates a new DataStream that contains the strings received infinitely
    -   * from socket. Received strings are decoded by the system's default
    -   * character set, uses '\n' as delimiter.
    -   *
    -   */
    -  def socketTextStream(hostname: String, port: Int): DataStream[String] =
    -    javaEnv.socketTextStream(hostname, port)
    -
    -  /**
    -   * Creates a new DataStream that contains a sequence of numbers.
    -   *
    -   */
    -  def generateSequence(from: Long, to: Long): DataStream[Long] = {
    -    new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
    -      asInstanceOf[DataStream[Long]]
    -  }
    -
    -  /**
    -   * Creates a DataStream that contains the given elements. The elements must all be of the
    -   * same type and must be serializable.
    -   *
    -   * * Note that this operation will result in a non-parallel data source, i.e. a data source with
    -   * a parallelism of one.
    -   */
    -  def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
    -    val typeInfo = implicitly[TypeInformation[T]]
    -    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
    -  }
    -
    -  /**
    -   * Creates a DataStream from the given non-empty [[Seq]]. The elements need to be serializable
    -   * because the framework may move the elements into the cluster if needed.
    -   *
    -   * Note that this operation will result in a non-parallel data source, i.e. a data source with
    -   * a parallelism of one.
    -   */
    -  def fromCollection[T: ClassTag: TypeInformation](
    -    data: Seq[T]): DataStream[T] = {
    -    require(data != null, "Data must not be null.")
    -    val typeInfo = implicitly[TypeInformation[T]]
    -
    -    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
    -        .asJavaCollection(data))
    -        
    -    javaEnv.addSource(sourceFunction).returns(typeInfo)
    -  }
    -
    -  /**
    -   * Create a DataStream using a user defined source function for arbitrary
    -   * source functionality. By default sources have a parallelism of 1. 
    -   * To enable parallel execution, the user defined source should implement 
    -   * ParallelSourceFunction or extend RichParallelSourceFunction. 
    -   * In these cases the resulting source will have the parallelism of the environment. 
    -   * To change this afterwards call DataStreamSource.setParallelism(int)
    -   *
    -   */
    -  def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
    -    require(function != null, "Function must not be null.")
    -    val cleanFun = StreamExecutionEnvironment.clean(function)
    -    val typeInfo = implicitly[TypeInformation[T]]
    -    javaEnv.addSource(cleanFun).returns(typeInfo)
    -  }
    -  
    -   /**
    -   * Create a DataStream using a user defined source function for arbitrary
    -   * source functionality.
    -   *
    -   */
    -  def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
    -    require(function != null, "Function must not be null.")
    -    val sourceFunction = new SourceFunction[T] {
    -      val cleanFun = StreamExecutionEnvironment.clean(function)
    -      override def run(out: Collector[T]) {
    -        cleanFun(out)
    -      }
    -      override def cancel() = {}
    -    }
    -    addSource(sourceFunction)
    -  }
    -
    -  /**
    -   * Triggers the program execution. The environment will execute all parts of
    -   * the program that have resulted in a "sink" operation. Sink operations are
    -   * for example printing results or forwarding them to a message queue.
    -   * <p>
    -   * The program execution will be logged and displayed with a generated
    -   * default name.
    -   *
    -   */
    -  def execute() = javaEnv.execute()
    -
    -  /**
    -   * Triggers the program execution. The environment will execute all parts of
    -   * the program that have resulted in a "sink" operation. Sink operations are
    -   * for example printing results or forwarding them to a message queue.
    -   * <p>
    -   * The program execution will be logged and displayed with the provided name
    -   *
    -   */
    -  def execute(jobName: String) = javaEnv.execute(jobName)
    -
    -  /**
    -   * Creates the plan with which the system will execute the program, and
    -   * returns it as a String using a JSON representation of the execution data
    -   * flow graph. Note that this needs to be called, before the plan is
    -   * executed.
    -   *
    -   */
    -  def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
    +	/**
    +	 * Sets the parallelism for operations executed through this environment.
    +	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    +	 * with x parallel instances. This value can be overridden by specific operations using
    +	 * [[DataStream.setParallelism]].
    +	 * @deprecated Please use [[setParallelism]]
    +	 */
    +	@deprecated
    +	def setDegreeOfParallelism(degreeOfParallelism: Int): Unit = {
    +		javaEnv.setParallelism(degreeOfParallelism)
    +	}
    +
    +	/**
    +	 * Sets the parallelism for operations executed through this environment.
    +	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run
    +	 * with x parallel instances. This value can be overridden by specific operations using
    +	 * [[DataStream.setParallelism]].
    +	 */
    +	def setParallelism(parallelism: Int): Unit = {
    +		javaEnv.setParallelism(parallelism)
    +	}
    +
    +	/**
    +	 * Returns the default parallelism for this execution environment. Note that this
    +	 * value can be overridden by individual operations using [[DataStream.setParallelism]]
    +	 * @deprecated Please use [[getParallelism]]
    +	 */
    +	@deprecated
    +	def getDegreeOfParallelism = javaEnv.getParallelism
    +
    +	/**
    +	 * Returns the default parallelism for this execution environment. Note that this
    +	 * value can be overridden by individual operations using [[DataStream.setParallelism]]
    +	 */
    +	def getParallelism = javaEnv.getParallelism
    +
    +	/**
    +	 * Sets the maximum time frequency (milliseconds) for the flushing of the
    +	 * output buffers. By default the output buffers flush frequently to provide
    +	 * low latency and to aid smooth developer experience. Setting the parameter
    +	 * can result in three logical modes:
    +	 *
    +	 * <ul>
    +	 * <li>
    +	 * A positive integer triggers flushing periodically by that integer</li>
    +	 * <li>
    +	 * 0 triggers flushing after every record thus minimizing latency</li>
    +	 * <li>
    +	 * -1 triggers flushing only when the output buffer is full thus maximizing
    +	 * throughput</li>
    +	 * </ul>
    +	 *
    +	 */
    +	def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment = {
    +		javaEnv.setBufferTimeout(timeoutMillis)
    +		this
    +	}
    +
    +	/**
    +	 * Gets the default buffer timeout set for this environment
    +	 */
    +	def getBufferTimout: Long = javaEnv.getBufferTimeout()
    +
    +	/**
    +	 * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
    +	 * operator states. Time interval between state checkpoints is specified in in millis.
    +	 *
    +	 * Setting this option assumes that the job is used in production and thus if not stated
    +	 * explicitly otherwise with calling with the
    +	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    +	 * failure the job will be resubmitted to the cluster indefinitely.
    +	 */
    +	def enableCheckpointing(interval: Long): StreamExecutionEnvironment = {
    +		javaEnv.enableCheckpointing(interval)
    +		this
    +	}
    +
    +	/**
    +	 * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
    +	 * operator states. Time interval between state checkpoints is specified in in millis.
    +	 *
    +	 * Setting this option assumes that the job is used in production and thus if not stated
    +	 * explicitly otherwise with calling with the
    +	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    +	 * failure the job will be resubmitted to the cluster indefinitely.
    +	 */
    +	def enableCheckpointing(): StreamExecutionEnvironment = {
    +		javaEnv.enableCheckpointing()
    +		this
    +	}
    +
    +	/**
    +	 * Disables operator chaining for streaming operators. Operator chaining
    +	 * allows non-shuffle operations to be co-located in the same thread fully
    +	 * avoiding serialization and de-serialization.
    +	 *
    +	 */
    +	def disableOperatorChaning(): StreamExecutionEnvironment = {
    +		javaEnv.disableOperatorChaning()
    +		this
    +	}
    +
    +	/**
    +	 * Sets the number of times that failed tasks are re-executed. A value of zero
    +	 * effectively disables fault tolerance. A value of "-1" indicates that the system
    +	 * default value (as defined in the configuration) should be used.
    +	 */
    +	def setNumberOfExecutionRetries(numRetries: Int): Unit = {
    +		javaEnv.setNumberOfExecutionRetries(numRetries)
    +	}
    +
    +	/**
    +	 * Gets the number of times the system will try to re-execute failed tasks. A value
    +	 * of "-1" indicates that the system default value (as defined in the configuration)
    +	 * should be used.
    +	 */
    +	def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
    +
    +
    +	/**
    +	 * Registers the given type with the serializer at the [[KryoSerializer]].
    +	 *
    +	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    +	 * because it may be distributed to the worker nodes by java serialization.
    +	 */
    +	def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
    +		javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    +	}
    +
    +	/**
    +	 * Registers the given type with the serializer at the [[KryoSerializer]].
    +	 */
    +	def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
    +		javaEnv.registerTypeWithKryoSerializer(clazz, serializer)
    +	}
    +
    +
    +	/**
    +	 * Registers a default serializer for the given class and its sub-classes at Kryo.
    +	 */
    +	def registerDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) {
    +		javaEnv.addDefaultKryoSerializer(clazz, serializer)
    +	}
    +
    +	/**
    +	 * Registers a default serializer for the given class and its sub-classes at Kryo.
    +	 *
    +	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
    +	 * because it may be distributed to the worker nodes by java serialization.
    +	 */
    +	def registerDefaultKryoSerializer(clazz: Class[_], serializer: Serializer[_]): Unit = {
    +		javaEnv.addDefaultKryoSerializer(clazz, serializer)
    +	}
    +
    +	/**
    +	 * Registers the given type with the serialization stack. If the type is eventually
    +	 * serialized as a POJO, then the type is registered with the POJO serializer. If the
    +	 * type ends up being serialized with Kryo, then it will be registered at Kryo to make
    +	 * sure that only tags are written.
    +	 *
    +	 */
    +	def registerType(typeClass: Class[_]) {
    +		javaEnv.registerType(typeClass)
    +	}
    +
    +	/**
    +	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be
    +	 * read with the system's default character set.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
    +	 * @return The data stream that represents the data read from the given file as text lines
    +	 */
    +	def readTextFile(filePath: String): DataStream[String] =
    +		javaEnv.readTextFile(filePath)
    +
    +	/**
    +	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link
    +	 * java.nio.charset.Charset} with the given name will be used to read the files.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param charsetName
    +	 * The name of the character set used to read the file
    +	 * @return The data stream that represents the data read from the given file as text lines
    +	 */
    +	def readTextFile(filePath: String, charsetName: String): DataStream[String] =
    +		javaEnv.readTextFile(filePath, charsetName)
    +
    +	/**
    +	 * Creates a data stream that contains the contents of file created while system watches the given path. The file
    +	 * will be read with the system's default character set.
    +	 *
    +	 * @param streamPath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path/")
    +	 * @param intervalMillis
    +	 * The interval of file watching in milliseconds
    +	 * @param watchType
    +	 * The watch type of file stream. When watchType is
    +	 * { @link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#ONLY_NEW_FILES},
    +	 * the system processes only new files.
    +	 * { @link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#REPROCESS_WITH_APPENDED}
    +	 * means that the system re-processes all contents of appended file.
    +	 * { @link org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType#PROCESS_ONLY_APPENDED}
    +	 * means that the system processes only appended contents of files.
    +	 * @return The DataStream containing the given directory.
    +	 */
    +	def readFileStream(streamPath: String, intervalMillis: Long = 100, watchType: WatchType =
    +	WatchType.ONLY_NEW_FILES): DataStream[String] =
    +		javaEnv.readFileStream(streamPath, intervalMillis, watchType)
    +
    +	/**
    +	 * Creates a data stream that represents the strings produced by reading the given file line wise. This method is
    +	 * similar to {@link #readTextFile(String)}, but it produces a data stream with mutable {@link StringValue}
    +	 * objects,
    +	 * rather than Java Strings. StringValues can be used to tune implementations to be less object and garbage
    +	 * collection heavy.
    +	 * <p/>
    +	 * The file will be read with the system's default character set.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @return A data stream that represents the data read from the given file as text lines
    +	 */
    +	def readTextFileWithValue(filePath: String): DataStream[StringValue] =
    +		javaEnv.readTextFileWithValue(filePath)
    +
    +	/**
    +	 * Creates a data stream that represents the Strings produced by reading the given file line wise. This method is
    +	 * similar to {@link #readTextFile(String, String)}, but it produces a data stream with mutable {@link StringValue}
    +	 * objects, rather than Java Strings. StringValues can be used to tune implementations to be less object and
    +	 * garbage
    +	 * collection heavy.
    +	 * <p/>
    +	 * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param charsetName
    +	 * The name of the character set used to read the file
    +	 * @param skipInvalidLines
    +	 * A flag to indicate whether to skip lines that cannot be read with the given character set
    +	 * @return A data stream that represents the data read from the given file as text lines
    +	 */
    +	def readTextFileWithValue(filePath: String, charsetName: String,
    +		skipInvalidLines: Boolean): DataStream[StringValue] =
    +		javaEnv.readTextFileWithValue(filePath, charsetName, skipInvalidLines)
    +
    +	/**
    +	 * Reads the given file with the given imput format.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param inputFormat
    +	 * The input format used to create the data stream
    +	 * @return The data stream that represents the data read from the given file
    +	 */
    +	def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T],
    +		filePath: String): DataStream[T] = {
    +		javaEnv.readFile(inputFormat, filePath)
    +	}
    +
    +	/**
    +	 * Creates a data stream that represents the primitive type produced by reading the given file line wise.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param typeClass
    +	 * The primitive type class to be read
    +	 * @return A data stream that represents the data read from the given file as primitive type
    +	 */
    +	def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataStream[T] = {
    +		javaEnv.readFileOfPrimitives(filePath, typeClass)
    +	}
    +
    +	/**
    +	 * Creates a data stream that represents the primitive type produced by reading the given file in delimited way.
    +	 *
    +	 * @param filePath
    +	 * The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
    +	 * @param delimiter
    +	 * The delimiter of the given file
    +	 * @param typeClass
    +	 * The primitive type class to be read
    +	 * @return A data stream that represents the data read from the given file as primitive type.
    +	 */
    +	def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String, typeClass: Class[T]): DataStream[T] = {
    +		javaEnv.readFileOfPrimitives(filePath, delimiter, typeClass)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
    +	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
    +	 */
    +	def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.FileInputFormat[K, V],
    +		key: Class[K], value: Class[V], inputPath: String): DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +		javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A {@link
    +	 * org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
    +	 */
    +	def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.FileInputFormat[K, V],
    +		key: Class[K], value: Class[V], inputPath: String, job: org.apache.hadoop.mapred.JobConf): DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +		javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A {@link
    +	 * org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
    +	 */
    +	def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
    +		key: Class[K], value: Class[V], inputPath: String, job: org.apache.hadoop.mapreduce.Job): DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +		javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath, job)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
    +	 */
    +	def readHadoopFile[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.lib.input.FileInputFormat[K, V],
    +		key: Class[K], value: Class[V], inputPath: String): DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] = {
    +		javaEnv.readHadoopFile(mapredInputFormat, key, value, inputPath)
    +	}
    +
    +	/**
    +	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
    +	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
    +	 * initiated.
    +	 * <p/>
    +	 * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
    +	 * the socket was gracefully terminated.
    +	 *
    +	 * @param hostname
    +	 * The host name which a server socket binds
    +	 * @param port
    +	 * The port number which a server socket binds. A port number of 0 means that the port number is automatically
    +	 * allocated.
    +	 * @param delimiter
    +	 * A character which splits received strings into records
    +	 * @param maxRetry
    +	 * The maximal retry interval in seconds while the program waits for a socket that is temporarily down.
    +	 * Reconnection is initiated every second. A number of 0 means that the reader is immediately terminated,
    +	 * while
    +	 * a	negative value ensures retrying forever.
    +	 * @return A data stream containing the strings received from the socket
    +	 */
    +	def socketTextStream(hostname: String, port: Int, delimiter: Char, maxRetry: Long): DataStream[String] =
    +		javaEnv.socketTextStream(hostname, port, delimiter, maxRetry)
    +
    +	/**
    +	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
    +	 * decoded by the system's default character set. The reader is terminated immediately when the socket is down.
    +	 *
    +	 * @param hostname
    +	 * The host name which a server socket binds
    +	 * @param port
    +	 * The port number which a server socket binds. A port number of 0 means that the port number is automatically
    +	 * allocated.
    +	 * @param delimiter
    +	 * A character which splits received strings into records
    +	 * @return A data stream containing the strings received from the socket
    +	 */
    +	def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String] =
    +		javaEnv.socketTextStream(hostname, port, delimiter)
    +
    +	/**
    +	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
    +	 * decoded by the system's default character set, using'\n' as delimiter. The reader is terminated immediately when
    +	 * the socket is down.
    +	 *
    +	 * @param hostname
    +	 * The host name which a server socket binds
    +	 * @param port
    +	 * The port number which a server socket binds. A port number of 0 means that the port number is automatically
    +	 * allocated.
    +	 * @return A data stream containing the strings received from the socket
    +	 */
    +	def socketTextStream(hostname: String, port: Int): DataStream[String] =
    +		javaEnv.socketTextStream(hostname, port)
    +
    +	//  <K, V > DataStreamSource[Tuple2[K, V]] createHadoopInput(mapredInputFormat: InputFormat[K, V], key: Class[K], value: Class[V], job: JobConf) {
    +	//    val hadoopInputFormat: HadoopInputFormat[K, V] = new HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
    +	//    return createInput(hadoopInputFormat, TypeExtractor.getInputFormatTypes(hadoopInputFormat), "Hadoop " + "Input source")
    +	//  }
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
    +	 */
    +	def createHadoopInput[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapred.InputFormat[K, V],
    +		key: Class[K], value: Class[V],
    +		job: org.apache.hadoop.mapred.JobConf): DataStream[org.apache.flink.api.java.tuple.Tuple2[K, V]] =
    +		javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
    +
    +	/**
    +	 * Creates a data stream from the given {@link org.apache.hadoop.mapred.InputFormat}.
    +	 */
    +	def createHadoopInput[K, V: ClassTag : TypeInformation](mapredInputFormat: org.apache.hadoop.mapreduce.InputFormat[K, V],
    +		key: Class[K], value: Class[V], inputPath: String, job: org.apache.hadoop.mapreduce.Job) =
    +		javaEnv.createHadoopInput(mapredInputFormat, key, value, job)
    +
    +	/**
    +	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
    +	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
    +	 * program is executed.
    +	 * <p/>
    +	 * Since all data streams need specific information about their types, this method needs to determine the type of
    +	 * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
    +	 * input
    +	 * format implements the {@link org.apache.flink.api.java.typeutils .ResultTypeQueryable} interface. In the latter
    +	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils
    +	 * .ResultTypeQueryable#getProducedType()} method to determine data type produced by the input format.
    +	 *
    +	 * @param inputFormat
    +	 * The input format used to create the data stream
    +	 * @return The data stream that represents the data created by the input format
    +	 */
    +	def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
    +		javaEnv.createInput(inputFormat)
    +
    +	/**
    +	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}. The data stream will not be immediately
    +	 * created - instead, this method returns a data stream that will be lazily created from the input format once the
    +	 * program is executed.
    +	 * <p/>
    +	 * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
    +	 * return
    +	 * type cannot be determined by reflection analysis, and that do not implement the {@link
    +	 * org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
    +	 *
    +	 * @param inputFormat
    +	 * The input format used to create the data stream
    +	 * @return The data stream that represents the data created by the input format
    +	 */
    +	def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _],
    +		typeInfo: TypeInformation[T]): DataStream[T] =
    +		javaEnv.createInput(inputFormat, typeInfo)
    +
    +	//  private[flink] def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T,_],
    +	//    typeInfo: TypeInformation[T], sourceName: String): DataStream[T] = {
    +	//    val function = new FileSourceFunction[T](inputFormat, typeInfo)
    +	//    val returnStream = javaEnv.addSource(function, sourceName).returns(typeInfo)
    +	//    javaEnv.getStreamGraph.setInputFormat(returnStream.getId, inputFormat)
    +	//    returnStream
    +	//  }
    +
    +	/**
    +	 * Creates a new data stream that contains a sequence of numbers. The data stream will be created in parallel, so
    +	 * there is no guarantee about the oder of the elements.
    +	 *
    +	 * @param from
    +	 * The number to start at (inclusive)
    +	 * @param to
    +	 * The number to stop at (inclusive)
    +	 * @return A data stream, containing all number in the [from, to] interval
    +	 */
    +	def generateSequence(from: Long, to: Long): DataStream[Long] = {
    +		new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
    +			asInstanceOf[DataStream[Long]]
    +	}
    +
    +	/**
    +	 * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
    +	 * example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty. Furthermore,
    +	 * the elements must be serializable (as defined in {@link java.io.Serializable}), because the execution
    +	 * environment
    +	 * may ship the elements into the cluster.
    +	 * <p/>
    +	 * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
    +	 * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
    +	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
    +	 * <p/>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism one.
    +	 *
    +	 * @param data
    +	 * The array of elements to create the data stream from.
    +	 * @return The data stream representing the given array of elements
    +	 */
    +	def fromElements[T: ClassTag : TypeInformation](data: T*): DataStream[T] = {
    +		val typeInfo = implicitly[TypeInformation[T]]
    +		fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
    +	 * elements in
    +	 * the collection. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
    +	 * framework may move the elements into the cluster if needed.
    +	 * <p/>
    +	 * The framework will try and determine the exact type from the collection elements. In case of generic
    +	 * elements, it
    +	 * may be necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
    +	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
    +	 * <p/>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism one.
    +	 *
    +	 * @param data
    +	 * The collection of elements to create the data stream from
    +	 * @return The data stream representing the given collection
    +	 */
    +	def fromCollection[T: ClassTag : TypeInformation](
    +		data: Seq[T]): DataStream[T] = {
    +		require(data != null, "Data must not be null.")
    +		val typeInfo = implicitly[TypeInformation[T]]
    +
    +		val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
    +			.asJavaCollection(data))
    +
    +		javaEnv.addSource(sourceFunction, "Collection source").returns(typeInfo)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given non-empty collection. The type of the data stream is the type given by
    +	 * typeInfo. The elements need to be serializable (as defined by {@link java.io.Serializable}), because the
    +	 * framework may move the elements into the cluster if needed.
    +	 * <p/>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism one.
    +	 *
    +	 * @param data
    +	 * The collection of elements to create the data stream from
    +	 * @param typeInfo
    +	 * The TypeInformation for the produced data stream
    +	 * @return The data stream representing the given collection
    +	 */
    +	def fromCollection[T: ClassTag : TypeInformation](
    +		data: Seq[T], typeInfo: TypeInformation[T]): DataStream[T] = {
    +		require(data != null, "Data must not be null.")
    +
    +		val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
    +			.asJavaCollection(data))
    +
    +		javaEnv.addSource(sourceFunction, "Collection source").returns(typeInfo)
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
    +	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
    +	 * class (this is due to the fact that the Java compiler erases the generic type information).
    +	 * <p/>
    +	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
    +	 * move it
    +	 * to a remote environment, if needed.
    +	 * <p/>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism of one.
    +	 *
    +	 * @param data
    +	 * The iterator of elements to create the data stream from
    +	 * @param typeClass
    +	 * The class of the data produced by the iterator. Must not be a generic class.
    +	 * @return The data stream representing the elements in the iterator
    +	 * @see #fromCollection(java.util.Iterator, org.apache.flink.api.common.typeinfo.TypeInformation)
    +	 */
    +	def fromCollection[T: ClassTag : TypeInformation](
    +		data: Iterator[T], typeClass: Class[T]): DataStream[T] = {
    +		fromCollection(data, TypeExtractor.getForClass(typeClass))
    +	}
    +
    +	/**
    +	 * Creates a data stream from the given iterator. Because the iterator will remain unmodified until the actual
    +	 * execution happens, the type of data returned by the iterator must be given explicitly in the form of the type
    +	 * information. This method is useful for cases where the type is generic. In that case, the type class (as
    +	 * given in
    +	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
    +	 * <p/>
    +	 * The iterator must be serializable (as defined in {@link java.io.Serializable}), because the framework may
    +	 * move it
    +	 * to a remote environment, if needed.
    +	 * <p/>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism one.
    +	 *
    +	 * @param data
    +	 * The iterator of elements to create the data stream from
    +	 * @param typeInfo
    +	 * The TypeInformation for the produced data stream
    +	 * @return The data stream representing the elements in the iterator
    +	 */
    +	def fromCollection[T: ClassTag : TypeInformation](
    +		data: Iterator[T], typeInfo: TypeInformation[T]): DataStream[T] = {
    +		require(data != null, "Data must not be null.")
    +		if (!data.isInstanceOf[java.io.Serializable]) {
    +			throw new IllegalArgumentException("The iterator must be serializable.")
    +		}
    +
    +		val sourceFunction = new FromIteratorFunction[T](scala.collection.JavaConversions
    +			.asJavaIterator(data))
    +
    +		javaEnv.addSource(sourceFunction, "Collection source").returns(typeInfo)
    +	}
    +
    +	/**
    +	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
    +	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
    +	 * must be
    +	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
    +	 * elements
    +	 * into the cluster.
    +	 * <p/>
    +	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
    +	 * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
    +	 * erases the generic type information).
    +	 *
    +	 * @param iterator
    +	 * The iterator that produces the elements of the data stream
    +	 * @param typeClass
    +	 * The class of the data produced by the iterator. Must not be a generic class.
    +	 * @return A data stream representing the elements in the iterator
    +	 */
    +	def fromParallelCollection[T: ClassTag : TypeInformation](iterator: SplittableIterator[T],
    +		typeClass: Class[T]): DataStream[T] = {
    +		javaEnv.fromParallelCollection(iterator, typeClass)
    +	}
    +
    +	/**
    +	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
    +	 * framework to create a parallel data stream source that returns the elements in the iterator. The iterator
    +	 * must be
    +	 * serializable (as defined in {@link java.io.Serializable}, because the execution environment may ship the
    +	 * elements
    +	 * into the cluster.
    +	 * <p/>
    +	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
    +	 * iterator must be given explicitly in the form of the type information. This method is useful for cases where the
    +	 * type is generic. In that case, the type class (as given in {@link #fromParallelCollection(SplittableIterator,
    +	 * Class)} does not supply all type information.
    +	 *
    +	 * @param iterator
    +	 * The iterator that produces the elements of the data stream
    +	 * @param typeInfo
    +	 * The TypeInformation for the produced data stream.
    +	 * @return A data stream representing the elements in the iterator
    +	 */
    +	def fromParallelCollection[T: ClassTag : TypeInformation](iterator: SplittableIterator[T],
    +		typeInfo: TypeInformation[T]): DataStream[T] = {
    +		javaEnv.fromParallelCollection(iterator, typeInfo)
    +	}
    +
    +	// private helper for passing different names
    +	private[flink] def fromParallelCollection[T: ClassTag : TypeInformation](iterator: SplittableIterator[T],
    +		typeInfo: TypeInformation[T], operatorName: String): DataStream[T] = {
    +		javaEnv.addSource(new FromIteratorFunction[T](iterator), operatorName).returns(typeInfo)
    +	}
    +
    +
    +	/**
    +	 * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
    +	 * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
    +	 * use
    +	 * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
    +	 * <p/>
    +	 * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
    +	 * implement {@link org.apache.flink.streaming.api.function.source.ParallelSourceFunction} or extend {@link
    +	 * org.apache.flink.streaming.api.function.source.RichParallelSourceFunction}. In these cases the resulting source
    +	 * will have the parallelism of the environment. To change this afterwards call {@link
    +	 * org.apache.flink.streaming.api.datastream.DataStreamSource#setParallelism(int)}
    +	 *
    +	 * @param function
    +	 * the user defined function
    +	 * @return the data stream constructed
    +	 */
    +	def addSource[T: ClassTag : TypeInformation](function: SourceFunction[T]): DataStream[T] = {
    +		require(function != null, "Function must not be null.")
    +		val cleanFun = StreamExecutionEnvironment.clean(function)
    +		javaEnv.addSource(cleanFun)
    +	}
    +
    +
    +	/**
    +	 * Ads a data source with a custom type information thus opening a {@link org.apache.flink.streaming.api
    +	 * .datastream.DataStream}. Only in very special cases does the user need to support type information. Otherwise
    +	 * use
    +	 * {@link #addSource(org.apache.flink.streaming.api.function.source.SourceFunction)}
    +	 *
    +	 * @param function
    +	 * the user defined function
    +	 * @param sourceName
    +	 * Name of the data source
    +	 * @return the data stream constructed
    +	 */
    +	def addSource[T: ClassTag : TypeInformation](function: SourceFunction[T],
    +		sourceName: String): DataStream[T] = {
    +		require(function != null, "Function must not be null.")
    +		val typeInfo: TypeInformation[T] =
    +			if (function.isInstanceOf[ResultTypeQueryable[T]]) {
    +				function.asInstanceOf[ResultTypeQueryable[T]].getProducedType
    +			}
    +			else {
    +				TypeExtractor.createTypeInfo(classOf[SourceFunction[T]], function.getClass, 0, null, null)
    +			}
    +
    +		val isParallel = function.isInstanceOf[ParallelSourceFunction[T]]
    +		val cleanFun = StreamExecutionEnvironment.clean(function)
    +		val sourceOperator = new StreamSource[T](cleanFun)
    +		new DataStreamSource[T](this.javaEnv, sourceName, typeInfo, sourceOperator,
    +			isParallel, sourceName)
    +	}
    +
    +	//	@SuppressWarnings("unchecked")
    +	//	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
    +	//		TypeInformation<OUT> typeInfo, String sourceName) {
    +	//
    +	//			if (typeInfo == null) {
    +	//				if (function instanceof GenericSourceFunction) {
    +	//					typeInfo = ((GenericSourceFunction<OUT>) function).getType();
    +	//				} else {
    +	//					typeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
    +	//					function.getClass(), 0, null, null);
    +	//				}
    +	//			}
    +	//
    +	//			boolean isParallel = function instanceof ParallelSourceFunction;
    +	//
    +	//			ClosureCleaner.clean(function, true);
    +	//			StreamOperator<OUT, OUT> sourceOperator = new StreamSource<OUT>(function);
    +	//
    +	//	return new DataStreamSource<OUT>(this, sourceName, typeInfo, sourceOperator,
    +	//		isParallel, sourceName);
    +	//	}
    +
    +	/**
    +	 * Create a DataStream using a user defined source function for arbitrary
    +	 * source functionality.
    +	 *
    +	 */
    +	def addSource[T: ClassTag : TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
    +		require(function != null, "Function must not be null.")
    +		val sourceFunction = new SourceFunction[T] {
    +			val cleanFun = StreamExecutionEnvironment.clean(function)
    +
    +			override def run(out: Collector[T]) {
    +				cleanFun(out)
    +			}
    +
    +			override def cancel() = {}
    +		}
    +		addSource(sourceFunction)
    +	}
    +
    +	/**
    +	 * Triggers the program execution. The environment will execute all parts of
    +	 * the program that have resulted in a "sink" operation. Sink operations are
    +	 * for example printing results or forwarding them to a message queue.
    +	 * <p>
    +	 * The program execution will be logged and displayed with a generated
    +	 * default name.
    +	 *
    +	 */
    +	def execute() = javaEnv.execute()
    +
    +	/**
    +	 * Triggers the program execution. The environment will execute all parts of
    +	 * the program that have resulted in a "sink" operation. Sink operations are
    +	 * for example printing results or forwarding them to a message queue.
    +	 * <p>
    +	 * The program execution will be logged and displayed with the provided name
    +	 *
    +	 */
    +	def execute(jobName: String) = javaEnv.execute(jobName)
    +
    +	/**
    +	 * Creates the plan with which the system will execute the program, and
    +	 * returns it as a String using a JSON representation of the execution data
    +	 * flow graph. Note that this needs to be called, before the plan is
    +	 * executed.
    +	 *
    +	 */
    +	def getExecutionPlan() = javaEnv.getStreamGraph.getStreamingPlanAsJSON
    --- End diff --
    
    This change is altering the indentation from spaces to tabs. In scala we are using spaces ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message