apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Munagala Ramanath <...@datatorrent.com>
Subject Enriching tuples (Was: Re: Information Needed)
Date Wed, 08 Jun 2016 18:03:03 GMT
In the most recent release (3.4.0), there is an enrichment operator. Please
take a look
at
https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java

It has a configurable BackendLoader for retrieving data from the enrichment
DB and a couple
of concrete implementations for files or JDBC.

How big is the enrichment DB likely to be ?

Ram

On Wed, Jun 8, 2016 at 7:39 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
suryavamshivardhan.mukkamula@rbc.com> wrote:

> Hi Ram,
>
>
>
> Can you please suggest if we can read the data from hive tables in the
> data torrent work flow ?
>
>
>
> My use case is I have to read the data from vertica database and once I
> read the alert one by one , in the next operator I have to enrich the alert
> by reading another source(Hive/Vertica/Hbase).
>
>
>
> The question for me is on the fly when I am reading the alerts as a
> continuous flow , I need to talk to another source for enriching the data
> and what would be the best solution for that, should I use Hive or
> Vertica(any database) or Hbase.
>
>
>
> Each day I have process around 20k alerts in my batch job.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 03 6:19 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> One way to do this, is to have a new operator which gets information about
> which alerts to update
>
> from the input operator; it saves this info but does not act on it until
> it gets a trigger from the operator
>
> that does the final write to HDFS (so it will have 2 input ports).
>
>
>
> Ram
>
>
>
> On Fri, Jun 3, 2016 at 12:05 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
>
>
> Thank you Ram,  I need another suggestion. Please help.
>
>
>
> I am reading alert data from Vertica data base using data torrent work
> flow, once the data is enriched and sent to a file, we need to update back
> the alert that we read from Vertica.
>
>
>
> In our current read operator we are reading the data by keeping them in
> buffered queue (I guess this was your suggestion when you were with us), My
> question is Can we update the alert back in Vertica once it is passed to
> file system on HDFS. This we need because we do not want to read the same
> alert next day.
>
>
>
> If you have any better way please let us know.
>
>
>
> The thread class which reads the data from data base is below FYR.
>
>
>
> package com.rbc.aml.cnscan.utils;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.util.concurrent.ConcurrentLinkedQueue;
>
>
>
> import java.sql.Connection;
>
> import java.sql.DriverManager;
>
> import java.sql.ResultSet;
>
> import java.sql.ResultSetMetaData;
>
> import java.sql.SQLException;
>
> import java.sql.Statement;
>
>
>
> public class JDBCQueueWriter extends Thread {
>
>     private static final Logger LOG =
> LoggerFactory.getLogger(JDBCQueueWriter.class);
>
>
>
>     private int offset;
>
>     protected String jdbcDriver;
>
>     protected String url;
>
>     protected String user;
>
>     protected String password;
>
>     protected String sql;
>
>     ConcurrentLinkedQueue<Object[]> bufferQueue;
>
>
>
>     public JDBCQueueWriter() {
>
>         super();
>
>     }
>
>
>
>     public JDBCQueueWriter( String jdbcDriver, String url, String user,
> String password, String sql
>
>                           , ConcurrentLinkedQueue<Object[]> bufferQueue,
> int offset) {
>
>         super();
>
>         this.jdbcDriver = jdbcDriver;
>
>         this.url = url;
>
>         this.user = user;
>
>         this.password = password;
>
>         this.sql = sql;
>
>         this.bufferQueue = bufferQueue;
>
>         this.offset = offset;
>
>     }
>
>
>
>
>
>     @Override
>
>     public void run() {
>
>         if (sql != null && sql.length() > 0) {
>
>             try {
>
>                 Class.forName(jdbcDriver);
>
>             } catch (ClassNotFoundException e) {
>
>                 LOG.error(e.getMessage());
>
>             }
>
>
>
>             try (Connection con = DriverManager.getConnection (url, user,
> password);
>
>                  Statement stmt = con.createStatement()) {
>
>                 ResultSet rs = stmt.executeQuery(sql);
>
>
>
>                 // skip past the rows that have already been processed
>
>                 for (int i = 0; i < offset && rs.next(); i++);
>
>
>
>                 ResultSetMetaData metadata = rs.getMetaData();
>
>                 int columnCount = metadata.getColumnCount();
>
>                 while (rs.next()) {
>
>                     Object[] values = new Object[columnCount];
>
>                     for (int i = 0; i < columnCount; i++) {
>
>                         values[i] = rs.getObject(i + 1);
>
>                         if (values[i] instanceof String) {
>
>                             values[i] = ((String)values[i]).trim();
>
>                         }
>
>                     }
>
>                     bufferQueue.add(values);
>
>                 }
>
>                 rs.close();
>
>             } catch (SQLException e) {
>
>                 LOG.error(e.getMessage());
>
>             } finally {
>
>                 // sending an ending signal
>
>                 bufferQueue.add(new Object[0]);
>
>             }
>
>         }
>
>     }
>
>
>
> }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, June, 02 5:31 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> One possible interim approach is to copy over the relevant code from
> Hadoop 2.8 into
>
> your Apex application and see if it compiles and runs properly with the
> "sftp://" protocol prefix.
>
>
>
> Would need a bit of effort to test and validate but might be worth it.
>
>
>
> Ram
>
>
>
> On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <thomas.weise@gmail.com>
> wrote:
>
> I created a JIRA to track this:
>
>
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2109
>
>
>
> Hadoop 2.8 with the existing Malhar file operators could be a solution.
>
>
>
>
>
> On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Sorry, I understood that SFTP option is yet to be available in Data
> torrent. I am assuming that I should create the files on HDFS and then a
> script sftp the files to another server , as per ram suggestion.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, June, 01 4:55 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you Ram and Devendra,
>
>
>
> I could read the DB with parallel queries.
>
>
>
> In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I
> did not see AbstractFTPOutputOperator.
>
>
>
> Can I use AbstractFileOutputOperator for FTP the output file to a
> different server (Outside the Hadoop cluster), Can you please suggest how
> would I do that ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com
> <devendrat@datatorrent.com>]
> *Sent:* 2016, May, 31 6:39 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils
> is a part of that release.
>
>
>
> <groupId>org.apache.apex</groupId>
>
> <artifactId>malhar</artifactId>
>
> <version>3.4.0</version>
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> I have the below dependency already added with in my POM but still I
> cannot get that class. Below are my dependencies.
>
>
>
>                 <properties>
>
>                                 <!-- change this if you desire to use a
> different version of DataTorrent -->
>
>
> <datatorrent.version>3.1.1</datatorrent.version>
>
>
> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>
>
>                 </properties>
>
>
>
>
>
>                 <dependencies>
>
>                                 <!-- add your dependencies here -->
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-common</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>malhar-library</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <!-- If you know that your
> application does not need transitive dependencies
>
>                                                                 pulled in
> by malhar-library, uncomment the following to reduce the size of
>
>                                                                 your app
> package. -->
>
>                                                 <!-- <exclusions>
> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>
>
>
> </exclusion> </exclusions> -->
>
>                                 </dependency>
>
>                                 <dependency>
>
>                 <groupId>com.datatorrent</groupId>
>
>                 <artifactId>dt-engine</artifactId>
>
>                 <version>${datatorrent.version}</version>
>
>                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.datatorrent</groupId>
>
>                                 <artifactId>dt-common</artifactId>
>
>                                 <version>${datatorrent.version}</version>
>
>                                 <scope>provided</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>terajdbc4</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.teradata.jdbc</groupId>
>
>                                 <artifactId>tdgssconfig</artifactId>
>
>                                 <version>14.00.00.21</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>com.ibm.db2</groupId>
>
>                                 <artifactId>db2jcc</artifactId>
>
>                                 <version>123</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>jdk.tools</groupId>
>
>
> <artifactId>jdk.tools</artifactId>
>
>                                                 <version>1.8</version>
>
>                                                 <scope>system</scope>
>
>                                                 <systemPath>C:/Program
> Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.apache.apex</groupId>
>
>
> <artifactId>malhar-contrib</artifactId>
>
>
> <version>3.2.0-incubating</version>
>
>
> <!--<scope>provided</scope> -->
>
>                                                 <exclusions>
>
>                                                                 <exclusion>
>
>
> <groupId>*</groupId>
>
>
> <artifactId>*</artifactId>
>
>
> </exclusion>
>
>                                                 </exclusions>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                                 <groupId>junit</groupId>
>
>
> <artifactId>junit</artifactId>
>
>                                                 <version>4.10</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.vertica</groupId>
>
>
> <artifactId>vertica-jdbc</artifactId>
>
>                                                 <version>7.2.1-0</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>                                 <groupId>org.apache.hbase</groupId>
>
>                                 <artifactId>hbase-client</artifactId>
>
>                                 <version>1.1.2</version>
>
>
> </dependency>
>
>                                 <dependency>
>
>
> <groupId>org.slf4j</groupId>
>
>
> <artifactId>slf4j-log4j12</artifactId>
>
>                                                 <version>1.7.19</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.datatorrent</groupId>
>
>
> <artifactId>dt-engine</artifactId>
>
>
> <version>${datatorrent.version}</version>
>
>                                                 <scope>test</scope>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>net.sf.flatpack</groupId>
>
>
> <artifactId>flatpack</artifactId>
>
>                                                 <version>3.4.2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>org.jdom</groupId>
>
>
> <artifactId>jdom</artifactId>
>
>                                                 <version>1.1.3</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.xmlbeans</groupId>
>
>
> <artifactId>xmlbeans</artifactId>
>
>                                                 <version>2.3.0</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>                                                 <groupId>dom4j</groupId>
>
>
> <artifactId>dom4j</artifactId>
>
>                                                 <version>1.6.1</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>javax.xml.stream</groupId>
>
>
> <artifactId>stax-api</artifactId>
>
>                                                 <version>1.0-2</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>
>
>                                 <dependency>
>
>
> <groupId>org.apache.poi</groupId>
>
>
> <artifactId>poi-ooxml-schemas</artifactId>
>
>                                                 <version>3.9</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                                 <dependency>
>
>
> <groupId>com.jcraft</groupId>
>
>
> <artifactId>jsch</artifactId>
>
>                                                 <version>0.1.53</version>
>
>                                 </dependency>
>
>                 </dependencies>
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 31 12:47 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Both of them are in malhar-library and should be pulled up once you
> include apex and apex-malhar.Can you check if adding malhar works,
>
>
>
>  <parent>
>
>     <groupId>org.apache.apex</groupId>
>
>     <artifactId>malhar</artifactId>
>
>     <version>3.5.0-SNAPSHOT</version>
>
>   </parent>
>
>
>
>   <artifactId>malhar-library</artifactId>
>
>   <packaging>jar</packaging>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can
> you please help me with POM dependency for the below classes?
>
>
>
> import org.apache.apex.malhar.lib.wal.WindowDataManager;
>
> import com.datatorrent.lib.util.KryoCloneUtils;
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 30 8:28 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> In the define partitions method above you can you check if the store
> object from AbstractStoreInputOperator is initialized.You can use the java
> bean syntax to set properties.
>
> The store properties to check are,
>
>
>
> store.getDatabaseDriver();
>
> store.getDatabaseUrl();
>
> store.getConnectionProperties();
>
>
>
> Also you would need to call store.connect inside the definePartitions
> before adding a new partition.
>
> Can you check the same and post the stack trace if it does not work.The
> current logger is pointing to an un-used class, could you please edit it to
> the below,
>
>
>
>  private static final Logger LOG =
> LoggerFactory.getLogger(ParallelJdbcInputOperator.class);
>
>
>
> You can check an open PR for doing parallel,idempotent reads from JDBC for
> reference https://github.com/apache/incubator-apex-malhar/pull/282/commits
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Ram,
>
>
>
> Thank you, I could check the logs on the server. The error that I am
> getting is store object is not set to the individual partitions. I tried
> setting the store in definepartition method but no luck. Below is my code ,
> can you please suggest a way forward.
>
> (I am trying to create parallel processing and defining separate queries
> for each partition so that data can be fetched in parallel)
>
>
>
>
> ######################Operator######################################################
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
> import java.util.ArrayList;
>
> import java.util.Collection;
>
> import java.util.Map;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.DefaultPartition;
>
> import com.datatorrent.api.Partitioner;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.Constants;
>
>
>
> import java.sql.PreparedStatement;
>
>
>
> public class ParallelJdbcInputOperator extends
> AbstractJdbcInputOperator<Object>
>
>                                 implements
> Partitioner<ParallelJdbcInputOperator> {
>
>                 private static final Logger LOG =
> LoggerFactory.getLogger(JdbcInputOperator.class);
>
>                 private transient PreparedStatement preparedStatement;
>
>                 private ResultSet resultSet = null;
>
>                 private String query;
>
>                 private long queryInterval = 50000;
>
>                 private long nextQueryTime = 0;
>
>
>
>                 public ParallelJdbcInputOperator() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>
>
>                                 super.setup(context);
>
>                 }
>
>
>
>                 @Override
>
>                 public void emitTuples() {
>
>                                 if (resultSet == null) {
>
>                                                 String query =
> queryToRetrieveData();
>
>                                                 if (query != null) {
>
>
> LOG.info(String.format("select statement: %s", query));
>
>                                                                 try {
>
>
> queryStatement.setQueryTimeout(120);
>
>
> resultSet = queryStatement.executeQuery(query);
>
>                                                                 } catch
> (SQLException ex) {
>
>
> store.disconnect();
>
>
> throw new RuntimeException(String.format("Error while running query: %s",
> query), ex);
>
>                                                                 }
>
>                                                 }
>
>                                 }
>
>
>
>                                 if (resultSet != null) {
>
>                                                 try {
>
>                                                                 boolean
> hasNext;
>
>                                                                 for (int i
> = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {
>
>
> Object tuple = getTuple(resultSet);
>
>
> if (tuple != null) {
>
>
> outputPort.emit(tuple);
>
>
> }
>
>                                                                 }
>
>                                                                 if
> (!hasNext) {
>
>
> resultSet.close();
>
>
> resultSet = null;
>
>                                                                 }
>
>                                                 } catch (SQLException ex) {
>
>
> store.disconnect();
>
>                                                                 throw new
> RuntimeException(String.format("Error while running retriving data"), ex);
>
>                                                 }
>
>                                 }
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>
> sb.append(result.getString(1));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(2));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(3));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(4));
>
>                                                 sb.append(",");
>
>
> sb.append(result.getString(5));
>
>                                                 System.out.println("tuple
> value" + sb.toString());
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 if (System.currentTimeMillis() <
> nextQueryTime) {
>
>                                                 return null;
>
>                                 }
>
>                                 nextQueryTime = System.currentTimeMillis()
> + queryInterval;
>
>                                 return query;
>
>                 }
>
>
>
>                 public String getQuery() {
>
>                                 return query;
>
>                 }
>
>
>
>                 public void setQuery(String query) {
>
>                                 this.query = query;
>
>                 }
>
>
>
>                 protected int emitBatchSize = 3000;
>
>
>
>                 public int getEmitBatchSize() {
>
>                                 return emitBatchSize;
>
>                 }
>
>
>
>                 public void setEmitBatchSize(int emitBatchSize) {
>
>                                 this.emitBatchSize = emitBatchSize;
>
>                 }
>
>
>
>                 @Override
>
>                 public
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> definePartitions(
>
>
> Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions,
>
>
> com.datatorrent.api.Partitioner.PartitioningContext context) {
>
>                                 int partitionSize;
>
>
> ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new
> ArrayList<Partition<ParallelJdbcInputOperator>>();
>
>
>
>                                 partitionSize =
> Constants.SOURCE_CODE_LIST.size();
>
>
>
>                                 for (int i = 0; i < partitionSize; i++) {
>
>                                                 try {
>
>                                                                 *ParallelJdbcInputOperator
> readOperator = new ParallelJdbcInputOperator();*
>
> *                                                                String
> newQuery = Constants.CLIENT_QUERY.replaceAll("\\?",
> Constants.SOURCE_CODE_LIST.get(i));*
>
> *
> readOperator.setQuery(newQuery);*
>
> *
> System.out.println("The New Query is:"+readOperator.getQuery());*
>
> *                                                                JdbcStore
> store = new JdbcStore();*
>
> *
> readOperator.setStore(store);*
>
> *
> Partition<ParallelJdbcInputOperator> partition = new
> DefaultPartition<ParallelJdbcInputOperator>(*
>
> *
> readOperator);*
>
> *
> newPartitions.add(partition);*
>
>                                                 } catch (Throwable ex) {
>
>
> DTThrowable.rethrow(ex);
>
>                                                 }
>
>                                 }
>
>
>
>                                 return newPartitions;
>
>                 }
>
>
>
>                 @Override
>
>                 public void partitioned(
>
>                                                 Map<Integer,
> com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>>
> partitions) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
> }
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 30 2:42 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You'll need to find which node is running YARN (Resource Manager) and look
> for log files on that machine.
>
> Usually, they are under */var/log/hadoop-yarn* or */var/log/hadoop* or
> similar locations.
>
>
>
> The files themselves will have names that vary depending on your
> installation; some examples:
>
> yarn-<user>-resourcemanager-<host>.log
>
> hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out
>
>
>
> Look for lines similar to the following that reference the *failed
> application id*; they will tell you what containers
>
> were allocated on which nodes on behalf of this application. You may then
> have to ssh into those nodes
>
> and check the specific container logs for more specific information on why
> it may have failed.
>
>
>
> Ram
>
> -----------------------------------
>
>
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>
>  OPERATION=AM Allocated Container        TARGET=SchedulerApp
> RESULT=SUCCESS  APPID=application_1462948052533_0036
>  CONTAINERID=container_1462948052533_0036_01_022468
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode:
> Assigned container container_1462948052533_0036_01_022468 of capacity
> <memory:1536, vCores:1> on host <host:port>, which has 9 containers,
> <memory:24064, vCores:9> used and <memory:180736, vCores:15> available
> after allocation
>
> 2016-05-24 02:53:42,636 INFO
> org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl:
> container_1462948052533_0183_01_036933 Container Transitioned from
> ALLOCATED to ACQUIRED
>
>
>
>
>
> On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a Vertica Database table with partitioning, tried my
> hand on partitioning by overriding the definepartition method. My launch is
> getting successful but when I see the Monitor tab , the Job is in failed
> state and I cannot see the logs for failed job on the DT Web console.
>
>
>
> Is there any way that I can view the logs on the UNIX machine for the
> failed jobs?
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR)
> *Sent:* 2016, May, 27 9:33 AM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Hi Ram,
>
>
>
> Thank you so much, it worked !!
>
>
>
> I have done with single input feed reading and parsing by using
> configuration file.
>
>
>
> I would like to do this for 100 feeds and 100 configuration files by using
> partitioning. I guess I have to know how to set individual feed directory
> and configuration file per partition , If I am not wrong.
>
>
>
> While I wait for your sample code to use partitioning , I will meanwhile
> try to understand the partitioning.
>
>
>
> Your support is well appreciated.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ram@datatorrent.com>]
> *Sent:* 2016, May, 26 7:32 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> You need to return null from readEntity() when br.readLine() returns null
> to signal that the EOF
>
> is reached.
>
>
>
> Ram
>
>
>
> On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Priyanka,
>
>
>
> There is only a single file in the directory and there are no external
> updates, the same code was working for simple file read from HDFS but when
> added the parsing part it is going to infinite loop.
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Priyanka Gugale [mailto:priyanka@datatorrent.com]
> *Sent:* 2016, May, 26 5:03 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> There is a setting called "scanIntervalMillis" that keeps scanning the
> input directory for newly added files. In your case if new files are
> getting added to the directory? Or if the input file timestamp is getting
> updated?
>
>
>
> -Priyanka
>
>
>
> On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> I am trying to read a file from HDFS and parse using XML configuration
> file and print on console. The issue I am facing is read file is going in
> infinite loop, I am not sure how to set file read to only once. Please help.
>
>
>
> My Operator code:
>
>
>
> package com.rbc.aml.cnscan.operator;
>
>
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
>
> import com.datatorrent.netlet.util.DTThrowable;
>
> import com.rbc.aml.cnscan.utils.ClientRecord;
>
>
>
> import net.sf.flatpack.DataSet;
>
> import net.sf.flatpack.DefaultParserFactory;
>
> import net.sf.flatpack.Parser;
>
>
>
> import org.apache.hadoop.conf.Configuration;
>
> import org.apache.hadoop.fs.FileSystem;
>
> import org.apache.hadoop.fs.Path;
>
> import org.slf4j.Logger;
>
> import org.slf4j.LoggerFactory;
>
>
>
> import java.io.BufferedReader;
>
> import java.io.IOException;
>
> import java.io.InputStream;
>
> import java.io.InputStreamReader;
>
> import java.io.StringReader;
>
>
>
> import javax.validation.constraints.NotNull;
>
>
>
> public class FeedInputOperator extends
> AbstractFileInputOperator<ClientRecord> {
>
>     private static Logger LOG =
> LoggerFactory.getLogger(FeedInputOperator.class);
>
>
>
>     protected transient BufferedReader br;
>
>     protected String fileName;
>
>     public transient DefaultOutputPort<ClientRecord> output = new
> DefaultOutputPort<>();
>
>
>
>     @NotNull
>
>     private String configFile = null;
>
>
>
>     public String getConfigFile() {
>
>         return configFile;
>
>     }
>
>
>
>     public void setConfigFile(String file) {
>
>         configFile = file;
>
>     }
>
>
>
>     @Override
>
>     protected InputStream openFile(Path path) throws IOException {
>
>         InputStream is = super.openFile(path);
>
>         fileName = path.getName();
>
>         System.out.println("input file is"+fileName);
>
>         br = new BufferedReader(new InputStreamReader(is));
>
>         return is;
>
>     }
>
>
>
>     @Override
>
>     protected void closeFile(InputStream is) throws IOException {
>
>         super.closeFile(is);
>
>         br.close();
>
>         br = null;
>
>     }
>
> // interface to the hadoop file system
>
>     private transient FileSystem fs;
>
>
>
>     private FileSystem getFS() {
>
>         if (fs == null) {
>
>             try {
>
>                 fs = FileSystem.get(new Configuration());
>
>             } catch (Exception e) {
>
>                 throw new RuntimeException("Unable to get handle to the
> filesystem");
>
>             }
>
>         }
>
>         return fs;
>
>     }
>
>     @Override
>
>     protected ClientRecord readEntity() throws IOException {
>
>                 String line = br.readLine();
>
>                 System.out.println("line is "+line);
>
>                 ClientRecord rec = new ClientRecord();
>
>                 try {
>
>             InputStream is = getFS().open(new Path(configFile));
>
>             Parser parser =
> DefaultParserFactory.getInstance().newFixedLengthParser(
>
>                     new InputStreamReader(is), new StringReader(line));
>
>             parser.setIgnoreExtraColumns(true);
>
>             final DataSet ds = parser.parse();
>
>             if (ds == null || ds.getRowCount() == 0) {
>
>                 throw new RuntimeException("Could not parse record");
>
>             }
>
>
>
>             if (ds.next()) {
>
>                 for (String col: ds.getColumns()) {
>
>                     LOG.debug("Col: " + col);
>
>                 }
>
>
>
>                 rec.sourceId = ds.getString("SYS_SRC_ID");
>
>                 rec.number = ds.getString("CLNT_NO");
>
>                 rec.divisionId = ds.getString("DIV_ID");
>
>                 rec.lastName = ds.getString("CLNT_NM");
>
>                 rec.firstName = ds.getString("CLNT_FRST_NM");
>
>                 rec.type = ds.getString("CLNT_TYP");
>
>                 rec.status = ds.getString("STS");
>
>                 rec.dob = ds.getString("DOB");
>
>                 rec.address1 = ds.getString("ST_ADDR_1_1");
>
>                 rec.address2 = ds.getString("ST_ADDR_1_2");
>
>                 rec.address3 = ds.getString("ST_ADDR_1_3");
>
>                 rec.address4 = ds.getString("ST_ADDR_1_4");
>
>                 rec.fileName = fileName;
>
>             }
>
>                 }catch(java.io.IOException e) {
>
>             DTThrowable.rethrow(e);
>
>         }
>
>
>
>         LOG.debug("Record: {}", rec);
>
>         return rec;
>
>     }
>
>
>
>     @Override
>
>     protected void emit(ClientRecord tuple) {
>
>
>
>         output.emit(tuple);
>
>     }
>
> }
>
>
>
>
>
> Regards,
>
> Surya Vamshi
>
> *From:* Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:
> suryavamshivardhan.mukkamula@rbc.com]
> *Sent:* 2016, May, 24 2:57 PM
> *To:* users@apex.apache.org
> *Subject:* RE: Information Needed
>
>
>
> Thank you ram.
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com
> <ram@datatorrent.com>]
> *Sent:* 2016, May, 24 2:53 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> I'll make a sample available in a day or two.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Thank you ram, do you have any sample code that deals with multiple
> directories?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 24 12:08 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Information Needed
>
>
>
> For scheduling, there is no built-in support but you have a simple script
> that starts the application at a
>
> predetermined time (using, for example, dtcli commands or the REST API),
> then, when you are sure
>
> that all data for the day has been processed and the application is idle,
> you can shutdown the application
>
> (again, using either dtcli or the REST APIs). I would suggest using a
> scripting language like Ruby or
>
> Python since they make many things easier than they shell.
>
>
>
> Handling multiple directories is a little more involved: you'll need to
> override the definePartition() method
>
> of the AbstractFileInputOperator and possibly the DirectoryScanner as well.
>
>
>
> Ram
>
>
>
> On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you all for your valuable inputs. My use case is there will be 100
> feeds on HDFS in different locations , not from the same location and I
> have to read them using DT and load into Data base daily once , what is the
> best way to schedule the Data torrent batch job? And how would I achieve
> the parallel processing when my files are in different folders ?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Munagala Ramanath [mailto:ram@datatorrent.com]
> *Sent:* 2016, May, 20 2:35 PM
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> It appears that SFTP support in the Hadoop file system will not be
> available till 2.8.0:
>
> https://issues.apache.org/jira/browse/HADOOP-5732
>
>
>
> So you might have to write your own SFTP operator or write to HDFS and use
> an
>
> external script to write to SFTP.
>
>
>
> Ram
>
>
>
> On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <
> devendrat@datatorrent.com> wrote:
>
> Hi Surya,
>
>
>
> Good to know the DB reads are working as expected.
>
>
>
> Here's a list of operators you can use/refer for the next use-case,
>
>
>
> HDFS input - for reading multiple input files in parallel you can set
> partitionCount on the AbstractFileInputOperator for parallel
> reads.LineByLineFileInputOperator is a concrete implementation for reading
> one line at a time.
>
>
>
> xml parsing - there is a XmlParser in Malhar that takes in a xml string
> and emits a POJO.
>
>
>
> Combining multiple files into one  - could you please give us a sense of
> the volume and the frequency of writes you expect so we can recommend
> something appropriate ?
>
>
>
> SFTP push - need to check on this one.Will revert.
>
>
>
> @Community, please feel free to chip in.
>
>
>
> Thanks,
>
> Dev
>
>
>
>
>
> On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi Devendra,
>
>
>
> Thank you , It is working now and I also could read the properties from
> xml file. I could also set the batch size and time gap for next database
> hit.
>
>
>
> Now , my another requirement is to read 50 different files from HDFS ,
> parse them using xml mapping and sftp as a single file to a UNIX box. Can
> you please suggest me the best practice like using parallel processing or
> partitioning?
>
>
>
> Do you have any sample code for parallel processing or partitioning and
> also how would I run the batch Job is there any batch scheduler that data
> torrent provides?
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 4:19 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> Can you try something like this,
>
>
>
>  JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new
> JdbcPOJOInputOperator());
>
>     JdbcStore store = new JdbcStore();
>
>     opr.setStore(store);
>
> The properties would then be set on this store object.
>
> From the code snippet provided earlier, the store was not being set on the
> JdbcInputOperator2
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hi ,
>
>
>
> Hello,
>
>
>
> When I have tried using below property as you suggested, my launch itself
> is failing. When I don’t use store and directly assign
> (‘dt.application.CountryNameScan.operator.<operatorName>.prop*.*databaseUrl’)
> launch is successful but application run is failing with null pointer
> exception since ‘store’ object is null.
>
>
>
> I see that in AbstractStoreInputOperator.java there is ‘store’ variable
> and I am not clear how the value is set to it.
>
>
>
> Regards,
>
> Surya Vamshi
>
>
>
> *From:* Devendra Tagare [mailto:devendrat@datatorrent.com]
> *Sent:* 2016, May, 18 12:57 PM
>
>
> *To:* users@apex.incubator.apache.org
> *Subject:* Re: Information Needed
>
>
>
> Hi,
>
>
>
> The property on the store is not getting set since ".store." qualifier is
> missing.Try the below for all store level properties.
>
>
>
>
>
> <property>
>
>     <name>dt.application.CountryNameScan.operator.<operatorName>.prop
> *.store.*databaseUrl</name>
>
>     <value>{databaseUrl}</value>
>
>   </property>
>
>
>
> Thanks,
>
> Dev
>
>
>
> On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
> suryavamshivardhan.mukkamula@rbc.com> wrote:
>
> Hello,
>
>
>
> Thank you Shubam.
>
>
>
> I have tried using AbstractJdbcInputOperator. Below is the Operator and
> the error that I am getting. My observation is ‘*store’* and ‘*context’*
> objects are null. Please help to solve this issue.
>
>
>
> *Error Logs:*
>
>
>
> java.lang.NullPointerException
>
>                 at
> com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)
>
>                 at
> com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)
>
>                 at
> com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)
>
>                 at com.datatorrent.stram.engine.Node.setup(Node.java:161)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)
>
>                 at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)
>
>
>
>
>
> *Operator Class:*
>
>
>
> import java.sql.ResultSet;
>
> import java.sql.SQLException;
>
>
>
> import com.datatorrent.api.Context;
>
> import com.datatorrent.api.Context.OperatorContext;
>
> import com.datatorrent.api.DefaultOutputPort;
>
> import java.sql.PreparedStatement;
>
> import com.datatorrent.api.Operator;
>
>
>
>
>
> public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>
>
>                                 implements
> Operator.ActivationListener<Context.OperatorContext> {
>
>
>
>                 private transient PreparedStatement preparedStatement;
>
>
>
>                 private String query;
>
>
>
>                 // @OutputPortFieldAnnotation(schemaRequired = true)
>
>                 public final transient DefaultOutputPort<Object>
> outputPort = new DefaultOutputPort<Object>();
>
>
>
>                 public JdbcInputOperator2() {
>
>                                 super();
>
>                 }
>
>
>
>                 @Override
>
>                 public void setup(Context.OperatorContext context) {
>
>                                 super.setup(context);
>
>
>
>                                 try {
>
>                                                 preparedStatement =
> store.connection.prepareStatement(queryToRetrieveData());
>
>                                                 System.out.println("store
> value is"+store);
>
>                                 } catch (Exception e) {
>
>
>
>                                 }
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public Object getTuple(ResultSet result) {
>
>                                 // TODO Auto-generated method stub
>
>                                 StringBuilder sb = new StringBuilder();
>
>                                 try {
>
>                                                 System.out.println("result
> set"+result);
>
>                                                 while (result.next()) {
>
>
> sb.append(result.getString("CLNT_NO"));
>
>
> sb.append(",");
>
>
> sb.append(result.getString("TR_NO"));
>
>
> System.out.println("tuple value"+sb.toString());
>
>                                                 }
>
>                                 } catch (SQLException e) {
>
>                                                 // TODO Auto-generated
> catch block
>
>                                                 e.printStackTrace();
>
>                                 }
>
>
>
>                                 return sb.toString();
>
>                 }
>
>
>
>                 @Override
>
>                 public String queryToRetrieveData() {
>
>                                 // TODO Auto-generated method stub
>
>                                 return query;
>
>                 }
>
>
>
>                 @Override
>
>                 public void activate(OperatorContext arg0) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>
>
> ...
>
> [Message clipped]
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>
>
> _______________________________________________________________________
>
> This [email] may be privileged and/or confidential, and the sender does
> not waive any related rights and obligations. Any distribution, use or
> copying of this [email] or the information it contains by other than an
> intended recipient is unauthorized. If you received this [email] in error,
> please advise the sender (by return [email] or otherwise) immediately. You
> have consented to receive the attached electronically at the above-noted
> address; please retain a copy of this confirmation for future reference.
>
>

Mime
View raw message