In the most recent release (3.4.0), there is an enrichment operator. Please take a look
at
https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/enrich/POJOEnricher.java

It has a configurable BackendLoader for retrieving data from the enrichment DB and a couple
of concrete implementations for files or JDBC.

How big is the enrichment DB likely to be ?

Ram

On Wed, Jun 8, 2016 at 7:39 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi Ram,

 

Can you please suggest if we can read the data from hive tables in the data torrent work flow ?

 

My use case is I have to read the data from vertica database and once I read the alert one by one , in the next operator I have to enrich the alert by reading another source(Hive/Vertica/Hbase).

 

The question for me is on the fly when I am reading the alerts as a continuous flow , I need to talk to another source for enriching the data and what would be the best solution for that, should I use Hive or Vertica(any database) or Hbase.

 

Each day I have process around 20k alerts in my batch job.

 

Regards,

Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, June, 03 6:19 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

One way to do this, is to have a new operator which gets information about which alerts to update

from the input operator; it saves this info but does not act on it until it gets a trigger from the operator

that does the final write to HDFS (so it will have 2 input ports).

 

Ram

 

On Fri, Jun 3, 2016 at 12:05 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

 

Thank you Ram,  I need another suggestion. Please help.

 

I am reading alert data from Vertica data base using data torrent work flow, once the data is enriched and sent to a file, we need to update back the alert that we read from Vertica.

 

In our current read operator we are reading the data by keeping them in buffered queue (I guess this was your suggestion when you were with us), My question is Can we update the alert back in Vertica once it is passed to file system on HDFS. This we need because we do not want to read the same alert next day.

 

If you have any better way please let us know.

 

The thread class which reads the data from data base is below FYR.

 

package com.rbc.aml.cnscan.utils;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.util.concurrent.ConcurrentLinkedQueue;

 

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.sql.Statement;

 

public class JDBCQueueWriter extends Thread {

    private static final Logger LOG = LoggerFactory.getLogger(JDBCQueueWriter.class);

 

    private int offset;

    protected String jdbcDriver;

    protected String url;

    protected String user;

    protected String password;

    protected String sql;

    ConcurrentLinkedQueue<Object[]> bufferQueue;

 

    public JDBCQueueWriter() {

        super();

    }

 

    public JDBCQueueWriter( String jdbcDriver, String url, String user, String password, String sql

                          , ConcurrentLinkedQueue<Object[]> bufferQueue, int offset) {

        super();

        this.jdbcDriver = jdbcDriver;

        this.url = url;

        this.user = user;

        this.password = password;

        this.sql = sql;

        this.bufferQueue = bufferQueue;

        this.offset = offset;

    }

 

 

    @Override

    public void run() {

        if (sql != null && sql.length() > 0) {

            try {

                Class.forName(jdbcDriver);

            } catch (ClassNotFoundException e) {

                LOG.error(e.getMessage());

            }

 

            try (Connection con = DriverManager.getConnection (url, user, password);

                 Statement stmt = con.createStatement()) {

                ResultSet rs = stmt.executeQuery(sql);

 

                // skip past the rows that have already been processed

                for (int i = 0; i < offset && rs.next(); i++);

 

                ResultSetMetaData metadata = rs.getMetaData();

                int columnCount = metadata.getColumnCount();

                while (rs.next()) {

                    Object[] values = new Object[columnCount];

                    for (int i = 0; i < columnCount; i++) {

                        values[i] = rs.getObject(i + 1);

                        if (values[i] instanceof String) {

                            values[i] = ((String)values[i]).trim();

                        }

                    }

                    bufferQueue.add(values);

                }

                rs.close();

            } catch (SQLException e) {

                LOG.error(e.getMessage());

            } finally {

                // sending an ending signal

                bufferQueue.add(new Object[0]);

            }

        }

    }

 

}

 

Regards,

Surya Vamshi

 

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, June, 02 5:31 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

One possible interim approach is to copy over the relevant code from Hadoop 2.8 into

your Apex application and see if it compiles and runs properly with the "sftp://" protocol prefix.

 

Would need a bit of effort to test and validate but might be worth it.

 

Ram

 

On Thu, Jun 2, 2016 at 2:17 PM, Thomas Weise <thomas.weise@gmail.com> wrote:

I created a JIRA to track this:  

 

 

Hadoop 2.8 with the existing Malhar file operators could be a solution.

 

 

On Thu, Jun 2, 2016 at 1:55 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi ,

 

Sorry, I understood that SFTP option is yet to be available in Data torrent. I am assuming that I should create the files on HDFS and then a script sftp the files to another server , as per ram suggestion.

 

Regards,

Surya Vamshi

 

From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, June, 01 4:55 PM
To: users@apex.apache.org
Subject: RE: Information Needed

 

Thank you Ram and Devendra,

 

I could read the DB with parallel queries.

 

In the DataTorrent class API’s I see only AbstractFTPInputOperator  but I did not see AbstractFTPOutputOperator.

 

Can I use AbstractFileOutputOperator for FTP the output file to a different server (Outside the Hadoop cluster), Can you please suggest how would I do that ?

 

Regards,

Surya Vamshi

 

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 31 6:39 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

Hi,

 

Could you please bump up the malhar version to 3.4.0 since KryoCloneUtils is a part of that release.

 

<groupId>org.apache.apex</groupId>

<artifactId>malhar</artifactId>

<version>3.4.0</version>

 

Thanks,

Dev

 

 

On Tue, May 31, 2016 at 10:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi ,

 

I have the below dependency already added with in my POM but still I cannot get that class. Below are my dependencies.

 

                <properties>

                                <!-- change this if you desire to use a different version of DataTorrent -->

                                <datatorrent.version>3.1.1</datatorrent.version>

                                <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath>

                </properties>

 

 

                <dependencies>

                                <!-- add your dependencies here -->

                                <dependency>

                                                <groupId>com.datatorrent</groupId>

                                                <artifactId>malhar-library</artifactId>

                                                <version>${datatorrent.version}</version>

                                </dependency>

                                <dependency>

                                                <groupId>com.datatorrent</groupId>

                                                <artifactId>dt-common</artifactId>

                                                <version>${datatorrent.version}</version>

                                                <scope>provided</scope>

                                </dependency>

                                <dependency>

                                                <groupId>com.datatorrent</groupId>

                                                <artifactId>malhar-library</artifactId>

                                                <version>${datatorrent.version}</version>

                                                <!-- If you know that your application does not need transitive dependencies

                                                                pulled in by malhar-library, uncomment the following to reduce the size of

                                                                your app package. -->

                                                <!-- <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId>

                                                                </exclusion> </exclusions> -->

                                </dependency>

                                <dependency>

                <groupId>com.datatorrent</groupId>

                <artifactId>dt-engine</artifactId>

                <version>${datatorrent.version}</version>

                <scope>test</scope>

                                </dependency>

                                <dependency>

                                <groupId>com.datatorrent</groupId>

                                <artifactId>dt-common</artifactId>

                                <version>${datatorrent.version}</version>

                                <scope>provided</scope>

                                </dependency>

                                <dependency>

                                <groupId>com.teradata.jdbc</groupId>

                                <artifactId>terajdbc4</artifactId>

                                <version>14.00.00.21</version>

                                </dependency>

                                <dependency>

                                <groupId>com.teradata.jdbc</groupId>

                                <artifactId>tdgssconfig</artifactId>

                                <version>14.00.00.21</version>

                                </dependency>

                                <dependency>

                                <groupId>com.ibm.db2</groupId>

                                <artifactId>db2jcc</artifactId>

                                <version>123</version>

                                </dependency>

                                <dependency>

                                                <groupId>jdk.tools</groupId>

                                                <artifactId>jdk.tools</artifactId>

                                                <version>1.8</version>

                                                <scope>system</scope>

                                                <systemPath>C:/Program Files/Java/jdk1.8.0_60/lib/tools.jar</systemPath>

                                </dependency>

                                <dependency>

                                                <groupId>org.apache.apex</groupId>

                                                <artifactId>malhar-contrib</artifactId>

                                                <version>3.2.0-incubating</version>

                                                <!--<scope>provided</scope> -->

                                                <exclusions>

                                                                <exclusion>

                                                                                <groupId>*</groupId>

                                                                                <artifactId>*</artifactId>

                                                                </exclusion>

                                                </exclusions>

                                </dependency>

                                <dependency>

                                                <groupId>junit</groupId>

                                                <artifactId>junit</artifactId>

                                                <version>4.10</version>

                                                <scope>test</scope>

                                </dependency>

                                <dependency>

                                                <groupId>com.vertica</groupId>

                                                <artifactId>vertica-jdbc</artifactId>

                                                <version>7.2.1-0</version>

                                </dependency>

                                <dependency>

                                <groupId>org.apache.hbase</groupId>

                                <artifactId>hbase-client</artifactId>

                                <version>1.1.2</version>

                                </dependency>                              

                                <dependency>

                                                <groupId>org.slf4j</groupId>

                                                <artifactId>slf4j-log4j12</artifactId>

                                                <version>1.7.19</version>

                                </dependency>

                                <dependency>

                                                <groupId>com.datatorrent</groupId>

                                                <artifactId>dt-engine</artifactId>

                                                <version>${datatorrent.version}</version>

                                                <scope>test</scope>

                                </dependency>

                               

                                <dependency>

                                                <groupId>net.sf.flatpack</groupId>

                                                <artifactId>flatpack</artifactId>

                                                <version>3.4.2</version>

                                </dependency>

                               

                                <dependency>

                                                <groupId>org.jdom</groupId>

                                                <artifactId>jdom</artifactId>

                                                <version>1.1.3</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>org.apache.poi</groupId>

                                                <artifactId>poi-ooxml</artifactId>

                                                <version>3.9</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>org.apache.xmlbeans</groupId>

                                                <artifactId>xmlbeans</artifactId>

                                                <version>2.3.0</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>dom4j</groupId>

                                                <artifactId>dom4j</artifactId>

                                                <version>1.6.1</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>javax.xml.stream</groupId>

                                                <artifactId>stax-api</artifactId>

                                                <version>1.0-2</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>org.apache.poi</groupId>

                                                <artifactId>poi</artifactId>

                                                <version>3.9</version>

                                </dependency>

 

                                <dependency>

                                                <groupId>org.apache.poi</groupId>

                                                <artifactId>poi-ooxml-schemas</artifactId>

                                                <version>3.9</version>

                                </dependency>

                                <dependency>

                                                <groupId>com.jcraft</groupId>

                                                <artifactId>jsch</artifactId>

                                                <version>0.1.53</version>

                                </dependency>

                                <dependency>

                                                <groupId>com.jcraft</groupId>

                                                <artifactId>jsch</artifactId>

                                                <version>0.1.53</version>

                                </dependency>

                </dependencies>

 

Regards,

Surya Vamshi

 

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 31 12:47 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

Hi,

 

Both of them are in malhar-library and should be pulled up once you include apex and apex-malhar.Can you check if adding malhar works,

 

 <parent>

    <groupId>org.apache.apex</groupId>

    <artifactId>malhar</artifactId>

    <version>3.5.0-SNAPSHOT</version>

  </parent>

 

  <artifactId>malhar-library</artifactId>

  <packaging>jar</packaging>

 

Thanks,

Dev

 

On Tue, May 31, 2016 at 7:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi Devendra,

 

JdbcPollInputOperator servers my purpose of DB pull with partitions.  Can you please help me with POM dependency for the below classes?

 

import org.apache.apex.malhar.lib.wal.WindowDataManager;

import com.datatorrent.lib.util.KryoCloneUtils;

 

Regards,

Surya Vamshi

 

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 30 8:28 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

Hi,

 

In the define partitions method above you can you check if the store object from AbstractStoreInputOperator is initialized.You can use the java bean syntax to set properties.

The store properties to check are,

 

store.getDatabaseDriver();

store.getDatabaseUrl();

store.getConnectionProperties();

 

Also you would need to call store.connect inside the definePartitions before adding a new partition.

Can you check the same and post the stack trace if it does not work.The current logger is pointing to an un-used class, could you please edit it to the below,

 

 private static final Logger LOG = LoggerFactory.getLogger(ParallelJdbcInputOperator.class); 

 

You can check an open PR for doing parallel,idempotent reads from JDBC for reference https://github.com/apache/incubator-apex-malhar/pull/282/commits

 

Thanks,

Dev

 

 

On Mon, May 30, 2016 at 1:01 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi Ram,

 

Thank you, I could check the logs on the server. The error that I am getting is store object is not set to the individual partitions. I tried setting the store in definepartition method but no luck. Below is my code , can you please suggest a way forward.

(I am trying to create parallel processing and defining separate queries for each partition so that data can be fetched in parallel)

 

######################Operator######################################################

 

package com.rbc.aml.cnscan.operator;

 

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Collection;

import java.util.Map;

 

import javax.validation.constraints.NotNull;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.datatorrent.api.Context;

import com.datatorrent.api.DefaultPartition;

import com.datatorrent.api.Partitioner;

import com.datatorrent.netlet.util.DTThrowable;

import com.rbc.aml.cnscan.utils.Constants;

 

import java.sql.PreparedStatement;

 

public class ParallelJdbcInputOperator extends AbstractJdbcInputOperator<Object>

                                implements Partitioner<ParallelJdbcInputOperator> {

                private static final Logger LOG = LoggerFactory.getLogger(JdbcInputOperator.class);

                private transient PreparedStatement preparedStatement;

                private ResultSet resultSet = null;

                private String query;

                private long queryInterval = 50000;

                private long nextQueryTime = 0;

 

                public ParallelJdbcInputOperator() {

                                super();

                }

 

                @Override

                public void setup(Context.OperatorContext context) {

 

                                super.setup(context);

                }

 

                @Override

                public void emitTuples() {

                                if (resultSet == null) {

                                                String query = queryToRetrieveData();

                                                if (query != null) {

                                                                LOG.info(String.format("select statement: %s", query));

                                                                try {

                                                                                queryStatement.setQueryTimeout(120);

                                                                                resultSet = queryStatement.executeQuery(query);

                                                                } catch (SQLException ex) {

                                                                                store.disconnect();

                                                                                throw new RuntimeException(String.format("Error while running query: %s", query), ex);

                                                                }

                                                }

                                }

 

                                if (resultSet != null) {

                                                try {

                                                                boolean hasNext;

                                                                for (int i = 0; (hasNext = resultSet.next()) && i < emitBatchSize; i++) {

                                                                                Object tuple = getTuple(resultSet);

                                                                                if (tuple != null) {

                                                                                                outputPort.emit(tuple);

                                                                                }

                                                                }

                                                                if (!hasNext) {

                                                                                resultSet.close();

                                                                                resultSet = null;

                                                                }

                                                } catch (SQLException ex) {

                                                                store.disconnect();

                                                                throw new RuntimeException(String.format("Error while running retriving data"), ex);

                                                }

                                }

                }

 

                @Override

                public Object getTuple(ResultSet result) {

                                // TODO Auto-generated method stub

                                StringBuilder sb = new StringBuilder();

                                try {

                                                sb.append(result.getString(1));

                                                sb.append(",");

                                                sb.append(result.getString(2));

                                                sb.append(",");

                                                sb.append(result.getString(3));

                                                sb.append(",");

                                                sb.append(result.getString(4));

                                                sb.append(",");

                                                sb.append(result.getString(5));

                                                System.out.println("tuple value" + sb.toString());

                                } catch (SQLException e) {

                                                // TODO Auto-generated catch block

                                                e.printStackTrace();

                                }

 

                                return sb.toString();

                }

 

                @Override

                public String queryToRetrieveData() {

                                if (System.currentTimeMillis() < nextQueryTime) {

                                                return null;

                                }

                                nextQueryTime = System.currentTimeMillis() + queryInterval;

                                return query;

                }

 

                public String getQuery() {

                                return query;

                }

 

                public void setQuery(String query) {

                                this.query = query;

                }

 

                protected int emitBatchSize = 3000;

 

                public int getEmitBatchSize() {

                                return emitBatchSize;

                }

 

                public void setEmitBatchSize(int emitBatchSize) {

                                this.emitBatchSize = emitBatchSize;

                }

 

                @Override

                public Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> definePartitions(

                                                Collection<com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions,

                                                com.datatorrent.api.Partitioner.PartitioningContext context) {

                                int partitionSize;

                                ArrayList<Partition<ParallelJdbcInputOperator>> newPartitions = new ArrayList<Partition<ParallelJdbcInputOperator>>();

 

                                partitionSize = Constants.SOURCE_CODE_LIST.size();

 

                                for (int i = 0; i < partitionSize; i++) {

                                                try {

                                                                ParallelJdbcInputOperator readOperator = new ParallelJdbcInputOperator();

                                                                String newQuery = Constants.CLIENT_QUERY.replaceAll("\\?", Constants.SOURCE_CODE_LIST.get(i));

                                                                readOperator.setQuery(newQuery);

                                                                System.out.println("The New Query is:"+readOperator.getQuery());

                                                                JdbcStore store = new JdbcStore();

                                                                readOperator.setStore(store);

                                                                Partition<ParallelJdbcInputOperator> partition = new DefaultPartition<ParallelJdbcInputOperator>(

                                                                                                readOperator);

                                                                newPartitions.add(partition);

                                                } catch (Throwable ex) {

                                                                DTThrowable.rethrow(ex);

                                                }

                                }

 

                                return newPartitions;

                }

 

                @Override

                public void partitioned(

                                                Map<Integer, com.datatorrent.api.Partitioner.Partition<ParallelJdbcInputOperator>> partitions) {

                                // TODO Auto-generated method stub

 

                }

 

}

 

Regards,

Surya Vamshi

 

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 30 2:42 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

You'll need to find which node is running YARN (Resource Manager) and look for log files on that machine.

Usually, they are under /var/log/hadoop-yarn or /var/log/hadoop or similar locations.

 

The files themselves will have names that vary depending on your installation; some examples:

yarn-<user>-resourcemanager-<host>.log

hadoop-cmf-yarn-RESOURCEMANAGER-<host>.log.out

 

Look for lines similar to the following that reference the failed application id; they will tell you what containers

were allocated on which nodes on behalf of this application. You may then have to ssh into those nodes

and check the specific container logs for more specific information on why it may have failed.

 

Ram

-----------------------------------

 

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=<user>  OPERATION=AM Allocated Container        TARGET=SchedulerApp     RESULT=SUCCESS  APPID=application_1462948052533_0036    CONTAINERID=container_1462948052533_0036_01_022468

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode: Assigned container container_1462948052533_0036_01_022468 of capacity <memory:1536, vCores:1> on host <host:port>, which has 9 containers, <memory:24064, vCores:9> used and <memory:180736, vCores:15> available after allocation

2016-05-24 02:53:42,636 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1462948052533_0183_01_036933 Container Transitioned from ALLOCATED to ACQUIRED

 

 

On Mon, May 30, 2016 at 9:24 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hello,

 

I am trying to read a Vertica Database table with partitioning, tried my hand on partitioning by overriding the definepartition method. My launch is getting successful but when I see the Monitor tab , the Job is in failed state and I cannot see the logs for failed job on the DT Web console.

 

Is there any way that I can view the logs on the UNIX machine for the failed jobs?

 

Regards,

Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR)
Sent: 2016, May, 27 9:33 AM
To: users@apex.apache.org
Subject: RE: Information Needed

 

Hi Ram,

 

Thank you so much, it worked !!

 

I have done with single input feed reading and parsing by using configuration file.

 

I would like to do this for 100 feeds and 100 configuration files by using partitioning. I guess I have to know how to set individual feed directory and configuration file per partition , If I am not wrong.

 

While I wait for your sample code to use partitioning , I will meanwhile try to understand the partitioning.

 

Your support is well appreciated.

 

Regards,

Surya Vamshi

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 26 7:32 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

You need to return null from readEntity() when br.readLine() returns null to signal that the EOF

is reached.

 

Ram

 

On Thu, May 26, 2016 at 2:07 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi Priyanka,

 

There is only a single file in the directory and there are no external updates, the same code was working for simple file read from HDFS but when added the parsing part it is going to infinite loop.

 

Regards,

Surya Vamshi

From: Priyanka Gugale [mailto:priyanka@datatorrent.com]
Sent: 2016, May, 26 5:03 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

There is a setting called "scanIntervalMillis" that keeps scanning the input directory for newly added files. In your case if new files are getting added to the directory? Or if the input file timestamp is getting updated?

 

-Priyanka

 

On Thu, May 26, 2016 at 12:36 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hello,

 

I am trying to read a file from HDFS and parse using XML configuration file and print on console. The issue I am facing is read file is going in infinite loop, I am not sure how to set file read to only once. Please help.

 

My Operator code:

 

package com.rbc.aml.cnscan.operator;

 

import com.datatorrent.api.DefaultOutputPort;

import com.datatorrent.lib.io.fs.AbstractFileInputOperator;

import com.datatorrent.netlet.util.DTThrowable;

import com.rbc.aml.cnscan.utils.ClientRecord;

 

import net.sf.flatpack.DataSet;

import net.sf.flatpack.DefaultParserFactory;

import net.sf.flatpack.Parser;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStream;

import java.io.InputStreamReader;

import java.io.StringReader;

 

import javax.validation.constraints.NotNull;

 

public class FeedInputOperator extends AbstractFileInputOperator<ClientRecord> {

    private static Logger LOG = LoggerFactory.getLogger(FeedInputOperator.class);

 

    protected transient BufferedReader br;

    protected String fileName;

    public transient DefaultOutputPort<ClientRecord> output = new DefaultOutputPort<>();

   

    @NotNull

    private String configFile = null;

 

    public String getConfigFile() {

        return configFile;

    }

 

    public void setConfigFile(String file) {

        configFile = file;

    }

 

    @Override

    protected InputStream openFile(Path path) throws IOException {

        InputStream is = super.openFile(path);

        fileName = path.getName();

        System.out.println("input file is"+fileName);

        br = new BufferedReader(new InputStreamReader(is));

        return is;

    }

 

    @Override

    protected void closeFile(InputStream is) throws IOException {

        super.closeFile(is);

        br.close();

        br = null;

    }

// interface to the hadoop file system

    private transient FileSystem fs;

 

    private FileSystem getFS() {

        if (fs == null) {

            try {

                fs = FileSystem.get(new Configuration());

            } catch (Exception e) {

                throw new RuntimeException("Unable to get handle to the filesystem");

            }

        }

        return fs;

    }

    @Override

    protected ClientRecord readEntity() throws IOException {

                String line = br.readLine();

                System.out.println("line is "+line);

                ClientRecord rec = new ClientRecord();

                try {

            InputStream is = getFS().open(new Path(configFile));

            Parser parser = DefaultParserFactory.getInstance().newFixedLengthParser(

                    new InputStreamReader(is), new StringReader(line));

            parser.setIgnoreExtraColumns(true);

            final DataSet ds = parser.parse();

            if (ds == null || ds.getRowCount() == 0) {

                throw new RuntimeException("Could not parse record");

            }

 

            if (ds.next()) {

                for (String col: ds.getColumns()) {

                    LOG.debug("Col: " + col);

                }

 

                rec.sourceId = ds.getString("SYS_SRC_ID");

                rec.number = ds.getString("CLNT_NO");

                rec.divisionId = ds.getString("DIV_ID");

                rec.lastName = ds.getString("CLNT_NM");

                rec.firstName = ds.getString("CLNT_FRST_NM");

                rec.type = ds.getString("CLNT_TYP");

                rec.status = ds.getString("STS");

                rec.dob = ds.getString("DOB");

                rec.address1 = ds.getString("ST_ADDR_1_1");

                rec.address2 = ds.getString("ST_ADDR_1_2");

                rec.address3 = ds.getString("ST_ADDR_1_3");

                rec.address4 = ds.getString("ST_ADDR_1_4");

                rec.fileName = fileName;

            }

                }catch(java.io.IOException e) {

            DTThrowable.rethrow(e);

        }

 

        LOG.debug("Record: {}", rec);

        return rec;

    }

 

    @Override

    protected void emit(ClientRecord tuple) {

 

        output.emit(tuple);

    }

}

 

 

Regards,

Surya Vamshi

From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkamula@rbc.com]
Sent: 2016, May, 24 2:57 PM
To: users@apex.apache.org
Subject: RE: Information Needed

 

Thank you ram.

 

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 24 2:53 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

I'll make a sample available in a day or two.

 

Ram

 

On Tue, May 24, 2016 at 11:33 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi ,

 

Thank you ram, do you have any sample code that deals with multiple directories?

 

Regards,

Surya Vamshi

 

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 24 12:08 PM
To: users@apex.apache.org
Subject: Re: Information Needed

 

For scheduling, there is no built-in support but you have a simple script that starts the application at a

predetermined time (using, for example, dtcli commands or the REST API), then, when you are sure

that all data for the day has been processed and the application is idle, you can shutdown the application

(again, using either dtcli or the REST APIs). I would suggest using a scripting language like Ruby or

Python since they make many things easier than they shell.

 

Handling multiple directories is a little more involved: you'll need to override the definePartition() method

of the AbstractFileInputOperator and possibly the DirectoryScanner as well.

 

Ram

 

On Tue, May 24, 2016 at 6:16 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hello,

 

Thank you all for your valuable inputs. My use case is there will be 100 feeds on HDFS in different locations , not from the same location and I have to read them using DT and load into Data base daily once , what is the best way to schedule the Data torrent batch job? And how would I achieve the parallel processing when my files are in different folders ?

 

Regards,

Surya Vamshi

 

From: Munagala Ramanath [mailto:ram@datatorrent.com]
Sent: 2016, May, 20 2:35 PM
To: users@apex.incubator.apache.org
Subject: Re: Information Needed

 

It appears that SFTP support in the Hadoop file system will not be available till 2.8.0:

 

So you might have to write your own SFTP operator or write to HDFS and use an

external script to write to SFTP.

 

Ram

 

On Fri, May 20, 2016 at 11:21 AM, Devendra Tagare <devendrat@datatorrent.com> wrote:

Hi Surya,

 

Good to know the DB reads are working as expected.

 

Here's a list of operators you can use/refer for the next use-case,

 

HDFS input - for reading multiple input files in parallel you can set partitionCount on the AbstractFileInputOperator for parallel reads.LineByLineFileInputOperator is a concrete implementation for reading one line at a time.

 

xml parsing - there is a XmlParser in Malhar that takes in a xml string and emits a POJO.

 

Combining multiple files into one  - could you please give us a sense of the volume and the frequency of writes you expect so we can recommend something appropriate ?

 

SFTP push - need to check on this one.Will revert.

 

@Community, please feel free to chip in.

 

Thanks,

Dev

 

 

On Fri, May 20, 2016 at 8:54 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi Devendra,

 

Thank you , It is working now and I also could read the properties from xml file. I could also set the batch size and time gap for next database hit.

 

Now , my another requirement is to read 50 different files from HDFS , parse them using xml mapping and sftp as a single file to a UNIX box. Can you please suggest me the best practice like using parallel processing or partitioning?

 

Do you have any sample code for parallel processing or partitioning and also how would I run the batch Job is there any batch scheduler that data torrent provides?

 

Regards,

Surya Vamshi

 

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 18 4:19 PM


To: users@apex.incubator.apache.org
Subject: Re: Information Needed

 

Hi,

 

Can you try something like this,

 

 JdbcPOJOInputOperator opr = dag.addOperator("JdbcPojo", new JdbcPOJOInputOperator());

    JdbcStore store = new JdbcStore();

    opr.setStore(store);

The properties would then be set on this store object.

From the code snippet provided earlier, the store was not being set on the JdbcInputOperator2

Thanks,

Dev

 

On Wed, May 18, 2016 at 12:50 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hi ,

 

Hello,

 

When I have tried using below property as you suggested, my launch itself is failing. When I don’t use store and directly assign (‘dt.application.CountryNameScan.operator.<operatorName>.prop.databaseUrl’) launch is successful but application run is failing with null pointer exception since ‘store’ object is null.

 

I see that in AbstractStoreInputOperator.java there is ‘store’ variable and I am not clear how the value is set to it.

 

Regards,

Surya Vamshi

 

From: Devendra Tagare [mailto:devendrat@datatorrent.com]
Sent: 2016, May, 18 12:57 PM


To: users@apex.incubator.apache.org
Subject: Re: Information Needed

 

Hi,

 

The property on the store is not getting set since ".store." qualifier is missing.Try the below for all store level properties.

 

 

<property>

    <name>dt.application.CountryNameScan.operator.<operatorName>.prop.store.databaseUrl</name>

    <value>{databaseUrl}</value>

  </property>

 

Thanks,

Dev

 

On Wed, May 18, 2016 at 8:38 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkamula@rbc.com> wrote:

Hello,

 

Thank you Shubam.

 

I have tried using AbstractJdbcInputOperator. Below is the Operator and the error that I am getting. My observation is ‘store’ and ‘context’ objects are null. Please help to solve this issue.

 

Error Logs:

 

java.lang.NullPointerException

                at com.datatorrent.lib.db.AbstractStoreInputOperator.setup(AbstractStoreInputOperator.java:77)

                at com.rbc.aml.cnscan.operator.AbstractJdbcInputOperator.setup(AbstractJdbcInputOperator.java:99)

                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:29)

                at com.rbc.aml.cnscan.operator.JdbcInputOperator2.setup(JdbcInputOperator2.java:13)

                at com.datatorrent.stram.engine.Node.setup(Node.java:161)

                at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1287)

                at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:92)

                at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1361)

 

 

Operator Class:

 

import java.sql.ResultSet;

import java.sql.SQLException;

 

import com.datatorrent.api.Context;

import com.datatorrent.api.Context.OperatorContext;

import com.datatorrent.api.DefaultOutputPort;

import java.sql.PreparedStatement;

import com.datatorrent.api.Operator;

 

 

public class JdbcInputOperator2 extends AbstractJdbcInputOperator<Object>

                                implements Operator.ActivationListener<Context.OperatorContext> {

 

                private transient PreparedStatement preparedStatement;

 

                private String query;

 

                // @OutputPortFieldAnnotation(schemaRequired = true)

                public final transient DefaultOutputPort<Object> outputPort = new DefaultOutputPort<Object>();

 

                public JdbcInputOperator2() {

                                super();

                }

 

                @Override

                public void setup(Context.OperatorContext context) {

                                super.setup(context);

 

                                try {

                                                preparedStatement = store.connection.prepareStatement(queryToRetrieveData());

                                                System.out.println("store value is"+store);

                                } catch (Exception e) {

 

                                }

 

                }

 

                @Override

                public Object getTuple(ResultSet result) {

                                // TODO Auto-generated method stub

                                StringBuilder sb = new StringBuilder();

                                try {

                                                System.out.println("result set"+result);

                                                while (result.next()) {

                                                                sb.append(result.getString("CLNT_NO"));

                                                                sb.append(",");

                                                                sb.append(result.getString("TR_NO"));

                                                                System.out.println("tuple value"+sb.toString());

                                                }

                                } catch (SQLException e) {

                                                // TODO Auto-generated catch block

                                                e.printStackTrace();

                                }

 

                                return sb.toString();

                }

 

                @Override

                public String queryToRetrieveData() {

                                // TODO Auto-generated method stub

                                return query;

                }

 

                @Override

                public void activate(OperatorContext arg0) {

                                // TODO Auto-generated method stub

 

                }

 

                @Override

          

...

[Message clipped]  

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

 

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.

 

_______________________________________________________________________

This [email] may be privileged and/or confidential, and the sender does not waive any related rights and obligations. Any distribution, use or copying of this [email] or the information it contains by other than an intended recipient is unauthorized. If you received this [email] in error, please advise the sender (by return [email] or otherwise) immediately. You have consented to receive the attached electronically at the above-noted address; please retain a copy of this confirmation for future reference.