flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-2622][streaming]add WriteMode for write...
Date Tue, 24 Nov 2015 15:31:10 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1098#discussion_r45749794
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
---
    @@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
        */
       def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
         javaStream.writeAsText(path, millis)
     
       /**
    +   * Writes a DataStream to the file specified by path in text format. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode)
    +    } else {
    +      javaStream.writeAsText(path)
    +    }
    +  }
    +
    +  /**
        * Writes a DataStream to the file specified by path in text format. The
        * writing is performed periodically, in every millis milliseconds. For
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode,
    +       millis: Long): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode, millis)
    +    } else {
    +      javaStream.writeAsText(path, millis)
    +    }
    +  }
    +
    +  /**
    +   * Writes a DataStream to the file specified by path in csv format. The
    +   * writing is performed periodically, in every millis milliseconds. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   */
    +  def writeAsCsv(
    +      path: String,
    +      millis: Long = 0): DataStreamSink[T] = {
    +    require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
    +    val of = new ScalaCsvOutputFormat[Product](
    +      new Path(path),
    +      ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    +      ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
    +    javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
    +  }
    +
    +  /**
    +   * Writes a DataStream to the file specified by path in csv format. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @return the closed DataStream
    +   */
    +  def writeAsCsv(
    +      path: String,
    +      writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
    +    require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
    +    val of = new ScalaCsvOutputFormat[Product](
    +      new Path(path),
    +      ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    +      ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
    +    if (writeMode != null) {
    +      of.setWriteMode(writeMode)
    +    }
    +    javaStream.write(of.asInstanceOf[OutputFormat[T]], 0L)
    --- End diff --
    
    Same here: Call the most generic method here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message