apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shub...@apache.org
Subject apex-malhar git commit: APEXMALHAR-2416 Implementation of Redshift output module
Date Fri, 17 Mar 2017 11:07:20 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 7f1abca75 -> d5c24dc8e


APEXMALHAR-2416 Implementation of Redshift output module


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d5c24dc8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d5c24dc8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d5c24dc8

Branch: refs/heads/master
Commit: d5c24dc8e283688f76fd41b3a57f9a2c1464593b
Parents: 7f1abca
Author: chaitanya <chaithu@apache.org>
Authored: Wed Mar 15 16:13:39 2017 +0530
Committer: chaitanya <chaithu@apache.org>
Committed: Wed Mar 15 16:13:39 2017 +0530

----------------------------------------------------------------------
 ...stractJdbcTransactionableOutputOperator.java |   8 +-
 ...dshiftJdbcTransactionableOutputOperator.java | 296 ++++++++++++++++
 .../lib/db/redshift/RedshiftOutputModule.java   | 345 +++++++++++++++++++
 .../apex/malhar/lib/fs/s3/S3Reconciler.java     |  97 +++---
 .../malhar/lib/fs/s3/S3TupleOutputModule.java   |  32 +-
 .../RedshiftJdbcTransactionalOperatorTest.java  | 178 ++++++++++
 .../apex/malhar/lib/fs/s3/S3ReconcilerTest.java |   5 +-
 7 files changed, 899 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
index 0a7f3fd..df22e6f 100644
--- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java
@@ -68,9 +68,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
 
   @Min(1)
   private int batchSize;
-  private final List<T> tuples;
+  protected final List<T> tuples;
 
-  private transient int batchStartIdx;
+  protected transient int batchStartIdx;
   private transient PreparedStatement updateCommand;
 
   @OutputPortFieldAnnotation(optional = true)
@@ -139,7 +139,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
     }
   }
 
-  private void processBatch()
+  protected void processBatch()
   {
     logger.debug("start {} end {}", batchStartIdx, tuples.size());
     try {
@@ -165,7 +165,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T>
    * @param updateCounts
    * @param commandsInBatch
    */
-  private void processUpdateCounts(int[] updateCounts, int commandsInBatch)
+  protected void processUpdateCounts(int[] updateCounts, int commandsInBatch)
   {
     if (updateCounts.length < commandsInBatch) {
       // Driver chose not to continue processing after failure.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java
b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java
new file mode 100644
index 0000000..286e852
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.db.redshift;
+
+import java.sql.BatchUpdateException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.sun.tools.javac.util.Assert;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+
+/**
+ *  A concrete implementation of AbstractJdbcTransactionableOutputOperator for Redshift which
takes FSRecordCompactionOperator.OutputMetaData.
+ *  Load the data into the specified redshift tables from data files. The files can be located
either in S3 or an Amazon EMR.
+ *  Specify the bucketName property if the file is located in S3 or specify the emrClusterId
if the file is location in EMR.
+ *  By default, it load files from S3 into Redshfit table. If the file is located in EMR,
then specify "readFromS3" parameter to false.
+ *
+ * @displayName Redshift Output Operator
+ * @category Output
+ * @tags database, jdbc, redshift
+ */
+@InterfaceStability.Evolving
+@OperatorAnnotation(partitionable = false)
+public class RedshiftJdbcTransactionableOutputOperator extends AbstractJdbcTransactionableOutputOperator<FSRecordCompactionOperator.OutputMetaData>
+{
+  private static final Logger logger = LoggerFactory.getLogger(RedshiftJdbcTransactionableOutputOperator.class);
+  protected static final String DEFAULT_REDSHIFT_DELIMITER = "|";
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretKey;
+  @NotNull
+  private String redshiftDelimiter = DEFAULT_REDSHIFT_DELIMITER;
+  private String region;
+  @NotNull
+  private RedshiftOutputModule.READER_MODE readerMode;
+  private String emrClusterId;
+  private String bucketName;
+  protected transient Statement stmt;
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    if (readerMode == RedshiftOutputModule.READER_MODE.READ_FROM_S3) {
+      Assert.checkNonNull(bucketName);
+    } else {
+      Assert.checkNonNull(emrClusterId);
+    }
+    super.setup(context);
+    try {
+      stmt = store.getConnection().createStatement();
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected String getUpdateCommand()
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  // Preparedstatement is not needed for uploading data into redshift. So, nothing to be
done in activate state.
+  @Override
+  public void activate(Context.OperatorContext context)
+  {
+  }
+
+  /**
+   * Create the copy statement from the specified OutputMetaData
+   * @param data Given OutputMetaData
+   * @return the copy statement
+   */
+
+  protected String generateCopyStatement(FSRecordCompactionOperator.OutputMetaData data)
+  {
+    String file = data.getPath();
+    StringBuilder exec = new StringBuilder();
+    exec.append("COPY " + tableName + " ");
+    if (readerMode == RedshiftOutputModule.READER_MODE.READ_FROM_S3) {
+      exec.append("FROM 's3://" + bucketName + "/" + file + "' ");
+    } else {
+      exec.append("FROM 'emr://" + emrClusterId + "/" + file + "' ");
+    }
+    exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey);
+    exec.append(";aws_secret_access_key=" + secretKey + "' ");
+    if (region != null) {
+      exec.append("region '" + region + "' ");
+    }
+    exec.append("DELIMITER '" + redshiftDelimiter + "'");
+    exec.append(";");
+    return exec.toString();
+  }
+
+  @Override
+  protected void processBatch()
+  {
+    logger.debug("start {} end {}", batchStartIdx, tuples.size());
+    try {
+      for (int i = batchStartIdx; i < tuples.size(); i++) {
+        String copyStmt = generateCopyStatement(tuples.get(i));
+        stmt.addBatch(copyStmt);
+      }
+      stmt.executeBatch();
+      stmt.clearBatch();
+      batchStartIdx += tuples.size() - batchStartIdx;
+    } catch (BatchUpdateException bue) {
+      logger.error(bue.getMessage());
+      processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx);
+    } catch (SQLException e) {
+      throw new RuntimeException("processing batch", e);
+    }
+  }
+
+  @Override
+  protected void setStatementParameters(PreparedStatement statement, FSRecordCompactionOperator.OutputMetaData
tuple) throws SQLException
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  /**
+   * Get the table name from database
+   * @return tableName
+   */
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Set the name of the table as it stored in redshift
+   * @param tableName table name
+   */
+  public void setTableName(@NotNull String tableName)
+  {
+    this.tableName = Preconditions.checkNotNull(tableName);
+  }
+
+  /**
+   * Get the AWS Access key
+   * @return accessKey
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the AWS Access Key
+   * @param accessKey given accessKey
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Get the AWS secret key
+   * @return secretKey
+   */
+  public String getSecretKey()
+  {
+    return secretKey;
+  }
+
+  /**
+   * Set the AWS secret key
+   * @param secretKey secretkey
+   */
+  public void setSecretKey(@NotNull String secretKey)
+  {
+    this.secretKey = Preconditions.checkNotNull(secretKey);
+  }
+
+  /**
+   * Return the delimiter character which is used to separate fields from input file.
+   * @return redshiftDelimiter
+   */
+  public String getRedshiftDelimiter()
+  {
+    return redshiftDelimiter;
+  }
+
+  /**
+   * Set the delimiter character which is used to separate fields from input file.
+   * @param redshiftDelimiter given redshiftDelimiter
+   */
+  public void setRedshiftDelimiter(@NotNull String redshiftDelimiter)
+  {
+    this.redshiftDelimiter = Preconditions.checkNotNull(redshiftDelimiter);
+  }
+
+  /**
+   * Get the AWS region from where the input file resides.
+   * @return region
+   */
+  public String getRegion()
+  {
+    return region;
+  }
+
+  /**
+   * Set the AWS region from where the input file resides.
+   * @param region region
+   */
+  public void setRegion(String region)
+  {
+    this.region = region;
+  }
+
+  /**
+   * Specifies whether the input files read from S3 or emr
+   * @return mode
+   */
+  public String getReaderMode()
+  {
+    return readerMode.toString();
+  }
+
+  /**
+   * Set the readFromS3 which indicates whether the input files read from S3 or emr
+   * @param readerMode Type of reader mode
+   */
+  public void setReaderMode(@Pattern(regexp = "READ_FROM_S3|READ_FROM_EMR", flags = Pattern.Flag.CASE_INSENSITIVE)
String readerMode)
+  {
+    this.readerMode = RedshiftOutputModule.READER_MODE.valueOf(readerMode);
+  }
+
+  /**
+   * Return the emrClusterId if the input files are located in EMR.
+   * @return emrClusterId
+   */
+  public String getEmrClusterId()
+  {
+    return emrClusterId;
+  }
+
+  /**
+   * Set the emrClusterId if the input files are located in EMR.
+   * @param emrClusterId emrClusterId
+   */
+  public void setEmrClusterId(String emrClusterId)
+  {
+    this.emrClusterId = emrClusterId;
+  }
+
+  /**
+   * Get the bucket name only if the input files are located in S3.
+   * @return bucketName.
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the bucket name only if the input files are located in S3.
+   * @param bucketName bucketName
+   */
+  public void setBucketName(String bucketName)
+  {
+    this.bucketName = bucketName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java
b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java
new file mode 100644
index 0000000..7d24ef9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.db.redshift;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Pattern;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator;
+import org.apache.apex.malhar.lib.fs.s3.S3TupleOutputModule;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static org.apache.apex.malhar.lib.db.redshift.RedshiftJdbcTransactionableOutputOperator.DEFAULT_REDSHIFT_DELIMITER;
+import static org.apache.apex.malhar.lib.db.redshift.RedshiftOutputModule.READER_MODE.READ_FROM_S3;
+
+/**
+ * Functionality of RedshiftOutputModule is load data into Redshift table. Data intermediately
writes to HDFS/S3 and
+ * rolling files will load into Redshift table using copy command.
+ * By default, it load files from S3 into Redshfit table. If the file is located in EMR,
then specify "readFromS3" parameter to false.
+ *
+ */
+@InterfaceStability.Evolving
+public class RedshiftOutputModule implements Module
+{
+  @NotNull
+  private String tableName;
+  @NotNull
+  private String accessKey;
+  @NotNull
+  private String secretKey;
+  private String region;
+  private String bucketName;
+  private String directoryName;
+  private String emrClusterId;
+  @NotNull
+  private String redshiftDelimiter = DEFAULT_REDSHIFT_DELIMITER;
+  protected static enum READER_MODE
+  {
+    READ_FROM_S3, READ_FROM_EMR;
+  }
+
+  private READER_MODE readerMode = READ_FROM_S3;
+  private int batchSize = 100;
+  private Long maxLengthOfRollingFile;
+  private JdbcTransactionalStore store = new JdbcTransactionalStore();
+
+  public final transient ProxyInputPort<byte[]> input = new ProxyInputPort<byte[]>();
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    if (readerMode == READ_FROM_S3) {
+      S3TupleOutputModule.S3BytesOutputModule tupleBasedS3 = dag.addModule("S3Compaction",
new S3TupleOutputModule.S3BytesOutputModule());
+      tupleBasedS3.setAccessKey(accessKey);
+      tupleBasedS3.setSecretAccessKey(secretKey);
+      tupleBasedS3.setBucketName(bucketName);
+      tupleBasedS3.setOutputDirectoryPath(directoryName);
+      if (maxLengthOfRollingFile != null) {
+        tupleBasedS3.setMaxLength(maxLengthOfRollingFile);
+      }
+      input.set(tupleBasedS3.input);
+
+      RedshiftJdbcTransactionableOutputOperator redshiftOutput = dag.addOperator("LoadToRedshift",
createRedshiftOperator());
+
+      dag.addStream("load-to-redshift", tupleBasedS3.output, redshiftOutput.input);
+    } else {
+      FSRecordCompactionOperator<byte[]> hdfsWriteOperator = dag.addOperator("WriteToHDFS",
new FSRecordCompactionOperator<byte[]>());
+      hdfsWriteOperator.setConverter(new GenericFileOutputOperator.NoOpConverter());
+      if (maxLengthOfRollingFile != null) {
+        hdfsWriteOperator.setMaxLength(maxLengthOfRollingFile);
+      }
+      input.set(hdfsWriteOperator.input);
+
+      RedshiftJdbcTransactionableOutputOperator redshiftOutput = dag.addOperator("LoadToRedshift",
createRedshiftOperator());
+
+      dag.addStream("load-to-redshift", hdfsWriteOperator.output, redshiftOutput.input);
+    }
+  }
+
+  /**
+   * Create the RedshiftJdbcTransactionableOutputOperator instance
+   * @return RedshiftJdbcTransactionableOutputOperator object
+   */
+  protected RedshiftJdbcTransactionableOutputOperator createRedshiftOperator()
+  {
+    RedshiftJdbcTransactionableOutputOperator redshiftOutput = new RedshiftJdbcTransactionableOutputOperator();
+    redshiftOutput.setAccessKey(accessKey);
+    redshiftOutput.setSecretKey(secretKey);
+    if (bucketName != null) {
+      redshiftOutput.setBucketName(bucketName);
+    }
+    redshiftOutput.setTableName(tableName);
+    if (emrClusterId != null) {
+      redshiftOutput.setEmrClusterId(emrClusterId);
+    }
+    redshiftOutput.setReaderMode(readerMode.toString());
+    redshiftOutput.setStore(store);
+    redshiftOutput.setBatchSize(batchSize);
+    redshiftOutput.setRedshiftDelimiter(redshiftDelimiter);
+    if (region != null) {
+      redshiftOutput.setRegion(region);
+    }
+    return redshiftOutput;
+  }
+
+  /**
+   * Get the table name from database
+   * @return tableName
+   */
+  public String getTableName()
+  {
+    return tableName;
+  }
+
+  /**
+   * Set the name of the table as it stored in redshift
+   * @param tableName given tableName
+   */
+  public void setTableName(@NotNull String tableName)
+  {
+    this.tableName = Preconditions.checkNotNull(tableName);
+  }
+
+  /**
+   * Get the AWS Access key
+   * @return accessKey
+   */
+  public String getAccessKey()
+  {
+    return accessKey;
+  }
+
+  /**
+   * Set the AWS Access Key
+   * @param accessKey accessKey
+   */
+  public void setAccessKey(@NotNull String accessKey)
+  {
+    this.accessKey = Preconditions.checkNotNull(accessKey);
+  }
+
+  /**
+   * Get the AWS secret key
+   * @return secretKey
+   */
+  public String getSecretKey()
+  {
+    return secretKey;
+  }
+
+  /**
+   * Set the AWS secret key
+   * @param secretKey secretKey
+   */
+  public void setSecretKey(@NotNull String secretKey)
+  {
+    this.secretKey = Preconditions.checkNotNull(secretKey);
+  }
+
+  /**
+   * Get the AWS region from where the input file resides.
+   * @return region
+   */
+  public String getRegion()
+  {
+    return region;
+  }
+
+  /**
+   * Set the AWS region from where the input file resides.
+   * This is mandatory property if S3/EMR and Redshift runs in different regions.
+   * @param region given region
+   */
+  public void setRegion(String region)
+  {
+    this.region = region;
+  }
+
+  /**
+   * Get the bucket name only if the input files are located in S3.
+   * @return bucketName
+   */
+  public String getBucketName()
+  {
+    return bucketName;
+  }
+
+  /**
+   * Set the bucket name only if the input files are located in S3.
+   * @param bucketName bucketName
+   */
+  public void setBucketName(@NotNull String bucketName)
+  {
+    this.bucketName = Preconditions.checkNotNull(bucketName);
+  }
+
+  /**
+   * Return the directory name under S3 bucket
+   * @return directoryName
+   */
+  public String getDirectoryName()
+  {
+    return directoryName;
+  }
+
+  /**
+   * Set the directory name under S3 bucket.
+   * @param directoryName directoryName
+   */
+  public void setDirectoryName(@NotNull String directoryName)
+  {
+    this.directoryName = Preconditions.checkNotNull(directoryName);
+  }
+
+  /**
+   * Get the EMR cluster id
+   * @return emrClusterId
+   */
+  public String getEmrClusterId()
+  {
+    return emrClusterId;
+  }
+
+  /**
+   * Set the EMR cluster id
+   * @param emrClusterId emrClusterId
+   */
+  public void setEmrClusterId(@NotNull String emrClusterId)
+  {
+    this.emrClusterId = Preconditions.checkNotNull(emrClusterId);
+  }
+
+  /**
+   * Return the delimiter character which is used to separate fields from input file.
+   * @return redshiftDelimiter
+   */
+  public String getRedshiftDelimiter()
+  {
+    return redshiftDelimiter;
+  }
+
+  /**
+   * Set the delimiter character which is used to separate fields from input file.
+   * @param redshiftDelimiter redshiftDelimiter
+   */
+  public void setRedshiftDelimiter(@NotNull String redshiftDelimiter)
+  {
+    this.redshiftDelimiter = Preconditions.checkNotNull(redshiftDelimiter);
+  }
+
+  /**
+   * Specifies whether the input files read from S3 or emr
+   * @return readerMode
+   */
+  public String getReaderMode()
+  {
+    return readerMode.toString();
+  }
+
+  /**
+   * Set the readFromS3 which indicates whether the input files read from S3 or emr
+   * @param readerMode Type of reader mode
+   */
+  public void setReaderMode(@Pattern(regexp = "READ_FROM_S3|READ_FROM_EMR", flags = Pattern.Flag.CASE_INSENSITIVE)
String readerMode)
+  {
+    this.readerMode = RedshiftOutputModule.READER_MODE.valueOf(readerMode);
+  }
+
+  /**
+   * Get the size of a batch operation.
+   * @return batchSize
+   */
+  public int getBatchSize()
+  {
+    return batchSize;
+  }
+
+  /**
+   * Sets the size of a batch operation.
+   * @param batchSize batchSize
+   */
+  public void setBatchSize(int batchSize)
+  {
+    this.batchSize = batchSize;
+  }
+
+  /**
+   * Get the maximum length in bytes of a rolling file.
+   * @return maxLengthOfRollingFile
+   */
+  public Long getMaxLengthOfRollingFile()
+  {
+    return maxLengthOfRollingFile;
+  }
+
+  /**
+   * Set the maximum length in bytes of a rolling file.
+   * @param maxLengthOfRollingFile maxLengthOfRollingFile
+   */
+  public void setMaxLengthOfRollingFile(Long maxLengthOfRollingFile)
+  {
+    this.maxLengthOfRollingFile = maxLengthOfRollingFile;
+  }
+
+  /**
+   * Get the JdbcTransactionalStore of a RedshiftJdbcTransactionableOutputOperator
+   * @return JdbcTransactionalStore
+   */
+  public JdbcTransactionalStore getStore()
+  {
+    return store;
+  }
+
+  /**
+   * Set the JdbcTransactionalStore
+   * @param store store
+   */
+  public void setStore(@NotNull JdbcTransactionalStore store)
+  {
+    this.store = Preconditions.checkNotNull(store);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
index 5fd19f9..0c47e26 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java
@@ -35,15 +35,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.io.fs.AbstractReconciler;
 
 /**
@@ -74,9 +78,9 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
   private String bucketName;
 
   /**
-   * S3 End point
+   * S3 Region
    */
-  private String endPoint;
+  private String region;
 
   /**
    * Directory name under S3 bucket
@@ -98,10 +102,15 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
 
   private static final String TMP_EXTENSION = ".tmp";
 
+  public final transient DefaultOutputPort<FSRecordCompactionOperator.OutputMetaData>
outputPort = new DefaultOutputPort<>();
+
   @Override
   public void setup(Context.OperatorContext context)
   {
     s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
+    if (region != null) {
+      s3client.setRegion(Region.getRegion(Regions.fromName(region)));
+    }
     filePath = context.getValue(DAG.APPLICATION_PATH);
     try {
       fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration());
@@ -144,9 +153,8 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
         throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE");
       }
       if (fs.exists(path)) {
-        logger.debug("Trying to upload : {}", path);
-        s3client.putObject(request);
-        logger.debug("Uploading : {}", keyName);
+        PutObjectResult result = s3client.putObject(request);
+        logger.debug("File {} Uploaded at {}", keyName, result.getETag());
       }
     } catch (FileNotFoundException e) {
       logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath());
@@ -161,36 +169,48 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
   @Override
   public void endWindow()
   {
-    logger.info("in endWindow()");
     while (doneTuples.peek() != null) {
       FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll();
-      logger.debug("found metaData = {}", metaData);
-      committedTuples.remove(metaData);
-      try {
-        Path dest = new Path(metaData.getPath());
-        //Deleting the intermediate files and when writing to tmp files
-        // there can be vagrant tmp files which we have to clean
-        FileStatus[] statuses = fs.listStatus(dest.getParent());
-
-        for (FileStatus status : statuses) {
-          String statusName = status.getPath().getName();
-          if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName()))
{
-            //a tmp file has tmp extension always preceded by timestamp
-            String actualFileName = statusName.substring(0,
-                statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
-            logger.debug("actualFileName = {}", actualFileName);
-            if (metaData.getFileName().equals(actualFileName)) {
-              logger.debug("deleting stray file {}", statusName);
-              fs.delete(status.getPath(), true);
-            }
-          } else if (statusName.equals(metaData.getFileName())) {
-            logger.info("deleting s3-compaction file {}", statusName);
+      removeIntermediateFiles(metaData);
+      /*if (outputPort.isConnected()) {
+        // Emit the meta data with S3 path
+        metaData.setPath(getDirectoryName() + Path.SEPARATOR + metaData.getFileName());
+        outputPort.emit(metaData);
+      }*/
+    }
+  }
+
+  /**
+   * Remove intermediate files
+   */
+  protected void removeIntermediateFiles(FSRecordCompactionOperator.OutputMetaData metaData)
+  {
+    logger.debug("found metaData = {}", metaData);
+    committedTuples.remove(metaData);
+    try {
+      Path dest = new Path(metaData.getPath());
+      //Deleting the intermediate files and when writing to tmp files
+      // there can be vagrant tmp files which we have to clean
+      FileStatus[] statuses = fs.listStatus(dest.getParent());
+
+      for (FileStatus status : statuses) {
+        String statusName = status.getPath().getName();
+        if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName()))
{
+          //a tmp file has tmp extension always preceded by timestamp
+          String actualFileName = statusName.substring(0,
+              statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
+          logger.debug("actualFileName = {}", actualFileName);
+          if (metaData.getFileName().equals(actualFileName)) {
+            logger.debug("deleting stray file {}", statusName);
             fs.delete(status.getPath(), true);
           }
+        } else if (statusName.equals(metaData.getFileName())) {
+          logger.info("deleting s3-compaction file {}", statusName);
+          fs.delete(status.getPath(), true);
         }
-      } catch (IOException e) {
-        logger.error("Unable to Delete a file: {}", metaData.getFileName());
       }
+    } catch (IOException e) {
+      logger.error("Unable to Delete a file: {}", metaData.getFileName());
     }
   }
 
@@ -279,24 +299,21 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator.
   }
 
   /**
-   * Return the S3 End point
-   *
-   * @return S3 End point
+   * Get the AWS S3 Region
+   * @return region
    */
-  public String getEndPoint()
+  public String getRegion()
   {
-    return endPoint;
+    return region;
   }
 
   /**
-   * Set the S3 End point
-   *
-   * @param endPoint
-   *          S3 end point
+   * Set the AWS S3 Region
+   * @param region region
    */
-  public void setEndPoint(String endPoint)
+  public void setRegion(String region)
   {
-    this.endPoint = endPoint;
+    this.region = region;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
index 59cd046..7e907b1 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java
@@ -52,6 +52,7 @@ import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
 public abstract class S3TupleOutputModule<INPUT> implements Module
 {
   public final transient ProxyInputPort<INPUT> input = new ProxyInputPort<INPUT>();
+  public final transient ProxyOutputPort<FSRecordCompactionOperator.OutputMetaData>
output = new ProxyOutputPort<>();
 
   /**
    * AWS access key
@@ -65,9 +66,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module
   private String secretAccessKey;
 
   /**
-   * S3 End point
+   * S3 Region
    */
-  private String endPoint;
+  private String region;
   /**
    * Name of the bucket in which to upload the files
    */
@@ -144,7 +145,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module
     s3Reconciler.setAccessKey(accessKey);
     s3Reconciler.setSecretKey(secretAccessKey);
     s3Reconciler.setBucketName(bucketName);
-    s3Reconciler.setEndPoint(endPoint);
+    if (region != null) {
+      s3Reconciler.setRegion(region);
+    }
     s3Reconciler.setDirectoryName(outputDirectoryPath);
 
     S3ReconcilerQueuePartitioner<S3Reconciler> reconcilerPartitioner = new S3ReconcilerQueuePartitioner<S3Reconciler>();
@@ -157,11 +160,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module
         Arrays.asList(new StatsListener[] {reconcilerPartitioner}));
     dag.setAttribute(s3Reconciler, OperatorContext.PARTITIONER, reconcilerPartitioner);
 
-    if (endPoint != null) {
-      s3Reconciler.setEndPoint(endPoint);
-    }
     dag.addStream("write-to-s3", s3compaction.output, s3Reconciler.input);
     input.set(s3compaction.input);
+    output.set(s3Reconciler.outputPort);
   }
 
   /**
@@ -228,24 +229,21 @@ public abstract class S3TupleOutputModule<INPUT> implements Module
   }
 
   /**
-   * Return the S3 End point
-   *
-   * @return S3 End point
+   * Get the S3 Region
+   * @return region
    */
-  public String getEndPoint()
+  public String getRegion()
   {
-    return endPoint;
+    return region;
   }
 
   /**
-   * Set the S3 End point
-   *
-   * @param endPoint
-   *          S3 end point
+   * Set the AWS S3 region
+   * @param region region
    */
-  public void setEndPoint(String endPoint)
+  public void setRegion(String region)
   {
-    this.endPoint = Preconditions.checkNotNull(endPoint);
+    this.region = region;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java
new file mode 100644
index 0000000..3d4496c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.db.redshift;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
+import org.apache.commons.io.FileUtils;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
+import static org.mockito.Mockito.when;
+
+public class RedshiftJdbcTransactionalOperatorTest
+{
+  private String inputDir;
+  private static final String FILE_1 = "file1.txt";
+  private static final String FILE_2 = "file2.txt";
+  private static final String FILE_1_DATA = "460|xkalk|665\n950|xkalk|152\n850|xsblk|252";
+  private static final String FILE_2_DATA = "640|xkalk|655\n50|bcklk|52";
+  private static FSRecordCompactionOperator.OutputMetaData file1Meta;
+  private static FSRecordCompactionOperator.OutputMetaData file2Meta;
+  private static List<FSRecordCompactionOperator.OutputMetaData> listOfFiles = new
ArrayList<>();
+  private static List<String> data = new ArrayList<>();
+
+  public static class TestMeta extends TestWatcher
+  {
+    public String baseDirectory;
+    Context.OperatorContext context;
+    @Mock
+    public Statement statement;
+    @Mock
+    public JdbcTransactionalStore store;
+    @Mock
+    public Connection conn;
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      MockitoAnnotations.initMocks(this);
+
+      try {
+        when(store.getConnection()).thenReturn(conn);
+        when(conn.createStatement()).thenReturn(statement);
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      try {
+        FileUtils.deleteDirectory(new File(baseDirectory));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Before
+  public void setup() throws Exception
+  {
+    inputDir = testMeta.baseDirectory + File.separator + "input";
+
+    File file1 = new File(inputDir + File.separator + FILE_1);
+    file1Meta = new FSRecordCompactionOperator.OutputMetaData(file1.getPath(), file1.getName(),
file1.length());
+    FileUtils.writeStringToFile(file1, FILE_1_DATA);
+
+    File file2 = new File(inputDir + File.separator + FILE_2);
+    file2Meta = new FSRecordCompactionOperator.OutputMetaData(file2.getPath(), file2.getName(),
file2.length());
+    FileUtils.writeStringToFile(file2, FILE_2_DATA);
+  }
+
+  @Test
+  public void TestBatchData() throws SQLException, IOException
+  {
+    RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
+    operator.setReaderMode("READ_FROM_S3");
+    operator.setStore(testMeta.store);
+    operator.setAccessKey("accessKey");
+    operator.setSecretKey("secretKey");
+    operator.setBucketName("bucketName");
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+    attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDirectory);
+    testMeta.context = mockOperatorContext(1, attributeMap);;
+
+    operator.setup(testMeta.context);
+    operator.beginWindow(1);
+    operator.input.process(file1Meta);
+    operator.input.process(file2Meta);
+    when(testMeta.statement.executeBatch()).thenReturn(executeBatch());
+    operator.endWindow();
+    Assert.assertEquals("Number of tuples in database", 5, data.size());
+  }
+
+  public int[] executeBatch() throws IOException
+  {
+    for (FSRecordCompactionOperator.OutputMetaData metaData: listOfFiles) {
+      data.addAll(FileUtils.readLines(new File(metaData.getPath())));
+    }
+    return null;
+  }
+
+  @Test
+  public void VerifyS3Properties()
+  {
+    RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
+    operator.setReaderMode("READ_FROM_S3");
+    operator.setAccessKey("accessKey");
+    operator.setSecretKey("secretKey");
+    operator.setBucketName("bucketName");
+
+    Assert.assertNotNull(operator.getBucketName());
+  }
+
+  @Test
+  public void VerifyEMRProperties()
+  {
+    RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator();
+    operator.setReaderMode("READ_FROM_EMR");
+    operator.setAccessKey("accessKey");
+    operator.setSecretKey("secretKey");
+    operator.setEmrClusterId("emrClusterId");
+
+    Assert.assertNotNull(operator.getEmrClusterId());
+  }
+
+  public static class RedshiftJdbcTransactionableTestOutputOperator extends RedshiftJdbcTransactionableOutputOperator
+  {
+    @Override
+    public void processTuple(FSRecordCompactionOperator.OutputMetaData tuple)
+    {
+      super.processTuple(tuple);
+      listOfFiles.add(tuple);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
index c276df7..9023b5c 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
@@ -78,7 +79,9 @@ public class S3ReconcilerTest
       underTest.setup(context);
 
       MockitoAnnotations.initMocks(this);
-      when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(null);
+      PutObjectResult result = new PutObjectResult();
+      result.setETag(outputPath);
+      when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(result);
       underTest.setS3client(s3clientMock);
     }
 


Mime
View raw message