Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 18BDE200CA4 for ; Wed, 7 Jun 2017 20:56:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1747C160BBF; Wed, 7 Jun 2017 18:56:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E90D3160BE5 for ; Wed, 7 Jun 2017 20:56:05 +0200 (CEST) Received: (qmail 25481 invoked by uid 500); 7 Jun 2017 18:56:05 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 25337 invoked by uid 99); 7 Jun 2017 18:56:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jun 2017 18:56:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6A3F8E029E; Wed, 7 Jun 2017 18:56:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: thw@apache.org To: commits@apex.apache.org Date: Wed, 07 Jun 2017 18:56:06 -0000 Message-Id: <003c1a5130314f6681e27b504c3697b2@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [4/6] apex-malhar git commit: Fixed tests and POM.Changes related to sql connector. archived-at: Wed, 07 Jun 2017 18:56:08 -0000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/resources/META-INF/properties-JdbcToJdbcApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/META-INF/properties-JdbcToJdbcApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-JdbcToJdbcApp.xml new file mode 100644 index 0000000..d9199c6 --- /dev/null +++ b/examples/jdbc/src/main/resources/META-INF/properties-JdbcToJdbcApp.xml @@ -0,0 +1,88 @@ + + + + + + + + dt.operator.JdbcInput.prop.store.databaseDriver + + org.hsqldb.jdbcDriver + + + + + dt.operator.JdbcInput.prop.store.databaseUrl + + jdbc:hsqldb:mem:test + + + + + dt.operator.JdbcInput.prop.fetchSize + + 120 + + + + + dt.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS + + org.apache.apex.examples.JdbcToJdbc.PojoEvent + + + + + dt.operator.JdbcInput.prop.query + + select * from test_event_table + + + + + + dt.operator.JdbcInput.prop.tableName + + test_event_table + + + + + dt.operator.JdbcOutput.prop.store.databaseDriver + + org.hsqldb.jdbcDriver + + + + + dt.operator.JdbcOutput.prop.store.databaseUrl + + jdbc:hsqldb:mem:test + + + + + dt.operator.JdbcOutput.prop.batchSize + + 5 + + + + + dt.operator.JdbcOutput.prop.tablename + + test_output_event_table + + + + + dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS + + org.apache.apex.examples.JdbcToJdbc.PojoEvent + + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml new file mode 100644 index 0000000..b67f845 --- /dev/null +++ b/examples/jdbc/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml @@ -0,0 +1,71 @@ + + + + + dt.application.operator.JdbcPoller.prop.partitionCount + 2 + + + + dt.application.operator.JdbcPoller.prop.store.databaseDriver + + org.hsqldb.jdbcDriver + + + + dt.application.operator.JdbcPoller.prop.store.databaseUrl + + jdbc:hsqldb:mem:test + + + + + + + dt.application.operator.JdbcPoller.prop.batchSize + 50 + + + + + dt.application.operator.JdbcPoller.prop.key + ACCOUNT_NO + + + + dt.application.operator.JdbcPoller.prop.columnsExpression + ACCOUNT_NO,NAME,AMOUNT + + + dt.application.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS + org.apache.apex.examples.JdbcIngest.PojoEvent + + + + + dt.application.operator.JdbcPoller.prop.tableName + test_event_table + + + + dt.application.operator.JdbcPoller.prop.pollInterval + 1000 + + + + + dt.application.operator.Writer.filePath + /tmp/test/output + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml b/examples/jdbc/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml new file mode 100644 index 0000000..589dbcd --- /dev/null +++ b/examples/jdbc/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml @@ -0,0 +1,66 @@ + + + + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseDriver + + org.hsqldb.jdbcDriver + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseUrl + + jdbc:hsqldb:mem:test + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.fetchSize + + 50 + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.query + + select * from test_event_table + + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.tableName + + test_event_table + + + + + dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS + + org.apache.apex.examples.JdbcIngest.PojoEvent + + + + + dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.filePath + + /tmp/jdbcApp + + + + dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.rotationWindows + + 5 + + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/main/resources/schema.json ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/main/resources/schema.json b/examples/jdbc/src/main/resources/schema.json new file mode 100755 index 0000000..3c191cf --- /dev/null +++ b/examples/jdbc/src/main/resources/schema.json @@ -0,0 +1,19 @@ +{ + "separator": ",", + "quoteChar":"\"", + "fields": [ + { + "name": "AccountNumber", + "type": "INTEGER" + }, + { + "name": "Name", + "type": "String" + }, + { + "name": "Amount", + "type": "INTEGER" + } + ] +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/FileToJdbcApp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/FileToJdbcApp/ApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/FileToJdbcApp/ApplicationTest.java new file mode 100755 index 0000000..3024836 --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/FileToJdbcApp/ApplicationTest.java @@ -0,0 +1,131 @@ +package org.apache.apex.examples.FileToJdbcApp; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; +import com.datatorrent.netlet.util.DTThrowable; + +/** + * Test the DAG declaration in local mode.
+ * The assumption to run this test case is that test_jdbc_table + * and meta-table are created already. + */ +public class ApplicationTest { + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_jdbc_table"; + + @BeforeClass + public static void setup() { + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + + ")"; + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + + } catch (Throwable e) { + DTThrowable.rethrow(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(DB_URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testCsvParserApp() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(new File("src/test/resources/test-FileToJdbcApp.xml").toURI().toURL()); + + lma.prepareDAG(new FileToJdbcCsvParser(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); // test will terminate after results are available + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + cleanTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + @Test + public void testCustomParserApp() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(new File("src/test/resources/test-FileToJdbcApp.xml").toURI().toURL()); + + lma.prepareDAG(new FileToJdbcCustomParser(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); // test will terminate after results are available + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + cleanTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/ApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/ApplicationTest.java new file mode 100644 index 0000000..080eae8 --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/ApplicationTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcIngest; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode.
+ * The assumption to run this test case is that test_event_table is created + * already + */ +public class ApplicationTest +{ + + @Test + @Ignore + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); + lma.prepareDAG(new JdbcHDFSApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcInputAppTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcInputAppTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcInputAppTest.java new file mode 100644 index 0000000..574534f --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcInputAppTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcIngest; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Application test for {@link JdbcHDFSApp} + */ +public class JdbcInputAppTest +{ + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_event_table"; + private static final String FILE_NAME = "/tmp/jdbcApp"; + + @BeforeClass + public static void setup() + { + try { + cleanup(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + cleanTable(); + insertEventsInTable(10, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(FILE_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void cleanTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "delete from " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); + lma.prepareDAG(new JdbcHDFSApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(5000); + + String[] extensions = { "dat.0", "tmp" }; + Collection list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); + Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size()); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java new file mode 100644 index 0000000..91c8f27 --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcIngest/JdbcPollerApplicationTest.java @@ -0,0 +1,129 @@ +package org.apache.apex.examples.JdbcIngest; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +public class JdbcPollerApplicationTest +{ + private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + private static final String TABLE_NAME = "test_event_table"; + private static final String OUTPUT_DIR_NAME = "/tmp/test/output"; + + @BeforeClass + public static void setup() + { + try { + cleanup(); + dropTable(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createTable = "CREATE TABLE " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + insertEventsInTable(10, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(OUTPUT_DIR_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void dropTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "DROP TABLE IF EXISTS " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl", URL); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver", DB_DRIVER); + conf.setInt("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount", 2); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key", "ACCOUNT_NO"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression", "ACCOUNT_NO,NAME,AMOUNT"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName", TABLE_NAME); + conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS", + "org.apache.apex.examples.JdbcIngest.PojoEvent"); + conf.set("dt.application.PollJdbcToHDFSApp.operator.Writer.filePath", OUTPUT_DIR_NAME); + + lma.prepareDAG(new JdbcPollerApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(45000); + + String[] extensions = { "dat.0", "tmp" }; + Collection list = FileUtils.listFiles(new File(OUTPUT_DIR_NAME), extensions, false); + int recordsCount = 0; + for (File file : list) { + recordsCount += FileUtils.readLines(file).size(); + } + Assert.assertEquals("Records in file", 10, recordsCount); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/ApplicationTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/ApplicationTest.java new file mode 100644 index 0000000..79c7235 --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/ApplicationTest.java @@ -0,0 +1,42 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.JdbcToJdbc; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode.
+ * The assumption to run this test case is that test_event_table,meta-table and + * test_output_event_table are created already + */ +public class ApplicationTest +{ + + @Test + @Ignore + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-JdbcToJdbcApp.xml")); + lma.prepareDAG(new JdbcToJdbcApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(50000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/JdbcOperatorTest.java b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/JdbcOperatorTest.java new file mode 100644 index 0000000..4ecc52d --- /dev/null +++ b/examples/jdbc/src/test/java/org/apache/apex/examples/JdbcToJdbc/JdbcOperatorTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.JdbcToJdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.lib.db.jdbc.AbstractJdbcInputOperator; +import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +/** + * Tests for {@link AbstractJdbcTransactionableOutputOperator} and + * {@link AbstractJdbcInputOperator} + */ +public class JdbcOperatorTest +{ + public static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; + public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; + + private static final String TABLE_NAME = "test_event_table"; + private static final String OUTPUT_TABLE_NAME = "test_output_event_table"; + + @BeforeClass + public static void setup() + { + try { + dropTable(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try { + Class.forName(DB_DRIVER).newInstance(); + + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " + + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " + "UNIQUE (" + + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " + ")"; + + System.out.println(createMetaTable); + stmt.executeUpdate(createMetaTable); + + String createTable = "CREATE TABLE " + TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createTable); + insertEventsInTable(10, 0); + + String createOutputTable = "CREATE TABLE " + OUTPUT_TABLE_NAME + + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; + stmt.executeUpdate(createOutputTable); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void dropTable() + { + try { + Connection con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + String cleanTable = "DROP TABLE IF EXISTS " + TABLE_NAME; + stmt.executeUpdate(cleanTable); + String cleanOutputTable = "DROP TABLE IF EXISTS " + OUTPUT_TABLE_NAME; + stmt.executeUpdate(cleanOutputTable); + + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static void insertEventsInTable(int numEvents, int offset) + { + try { + Connection con = DriverManager.getConnection(URL); + String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; + PreparedStatement stmt = con.prepareStatement(insert); + for (int i = 0; i < numEvents; i++, offset++) { + stmt.setInt(1, offset); + stmt.setString(2, "Account_Holder-" + offset); + stmt.setInt(3, (offset * 1000)); + stmt.executeUpdate(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public int getNumOfEventsInStore() + { + Connection con; + try { + con = DriverManager.getConnection(URL); + Statement stmt = con.createStatement(); + + String countQuery = "SELECT count(*) from " + OUTPUT_TABLE_NAME; + ResultSet resultSet = stmt.executeQuery(countQuery); + resultSet.next(); + return resultSet.getInt(1); + } catch (SQLException e) { + throw new RuntimeException("fetching count", e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-JdbcToJdbcApp.xml")); + lma.prepareDAG(new JdbcToJdbcApp(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for records to be added to table + Thread.sleep(5000); + + Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); + dropTable(); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/example-FileToJdbcApp.sql ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/example-FileToJdbcApp.sql b/examples/jdbc/src/test/resources/example-FileToJdbcApp.sql new file mode 100644 index 0000000..4461247 --- /dev/null +++ b/examples/jdbc/src/test/resources/example-FileToJdbcApp.sql @@ -0,0 +1,8 @@ +CREATE DATABASE IF NOT EXISTS testJdbc; + +USE testJdbc; + +CREATE TABLE IF NOT EXISTS `test_jdbc_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255), + `AMOUNT` int(11)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/example-JdbcIngest.sql ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/example-JdbcIngest.sql b/examples/jdbc/src/test/resources/example-JdbcIngest.sql new file mode 100644 index 0000000..531c659 --- /dev/null +++ b/examples/jdbc/src/test/resources/example-JdbcIngest.sql @@ -0,0 +1,24 @@ +DROP DATABASE IF EXISTS testDev; + +CREATE DATABASE testDev; + +USE testDev; + +CREATE TABLE IF NOT EXISTS `test_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL, + primary key(`ACCOUNT_NO`) +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES +(1, 'User1', 1000), +(2, 'User2', 2000), +(3, 'User3', 3000), +(4, 'User4', 4000), +(5, 'User5', 5000), +(6, 'User6', 6000), +(7, 'User7', 7000), +(8, 'User8', 8000), +(9, 'User9', 9000), +(10, 'User10', 1000); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/example-JdbcToJdbc.sql ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/example-JdbcToJdbc.sql b/examples/jdbc/src/test/resources/example-JdbcToJdbc.sql new file mode 100644 index 0000000..104240c --- /dev/null +++ b/examples/jdbc/src/test/resources/example-JdbcToJdbc.sql @@ -0,0 +1,36 @@ +DROP DATABASE IF EXISTS testDev; + +CREATE DATABASE testDev; + +USE testDev; + +CREATE TABLE IF NOT EXISTS `test_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES +(1, 'User1', 1000), +(2, 'User2', 2000), +(3, 'User3', 3000), +(4, 'User4', 4000), +(5, 'User5', 5000), +(6, 'User6', 6000), +(7, 'User7', 7000), +(8, 'User8', 8000), +(9, 'User9', 9000), +(10, 'User10', 1000); + +CREATE TABLE IF NOT EXISTS `test_output_event_table` ( + `ACCOUNT_NO` int(11) NOT NULL, + `NAME` varchar(255) DEFAULT NULL, + `AMOUNT` int(11) DEFAULT NULL +) ENGINE=MyISAM DEFAULT CHARSET=latin1; + +CREATE TABLE IF NOT EXISTS `dt_meta` ( + `dt_app_id` VARCHAR(100) NOT NULL, + `dt_operator_id` INT NOT NULL, + `dt_window` BIGINT NOT NULL, +UNIQUE (`dt_app_id`, `dt_operator_id`, `dt_window`) +) ENGINE=MyISAM DEFAULT CHARSET=latin1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/log4j.properties b/examples/jdbc/src/test/resources/log4j.properties new file mode 100644 index 0000000..3bfcdc5 --- /dev/null +++ b/examples/jdbc/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/test-FileToJdbcApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/test-FileToJdbcApp.xml b/examples/jdbc/src/test/resources/test-FileToJdbcApp.xml new file mode 100755 index 0000000..477cdbf --- /dev/null +++ b/examples/jdbc/src/test/resources/test-FileToJdbcApp.xml @@ -0,0 +1,58 @@ + + + + dt.operator.JdbcOutput.prop.store.databaseDriver + org.hsqldb.jdbcDriver + + + + dt.operator.JdbcOutput.prop.store.databaseUrl + jdbc:hsqldb:mem:test;sql.syntax_mys=true + + + + dt.operator.JdbcOutput.prop.store.userName + sa + + + + dt.operator.JdbcOutput.prop.store.password + + + + + dt.operator.JdbcOutput.prop.batchSize + 5 + + + + dt.operator.JdbcOutput.prop.tablename + test_jdbc_table + + + + dt.operator.JdbcOutput.port.input.attr.TUPLE_CLASS + org.apache.apex.examples.FileToJdbcApp.PojoEvent + + + + dt.operator.FileReader.prop.directory + src/test/resources/test-input + + + + + + dt.application.FileToJdbcCsvParser.operator.CsvParser.port.out.attr.TUPLE_CLASS + org.apache.apex.examples.FileToJdbcApp.PojoEvent + + + + dt.application.FileToJdbcCustomParser.operator.CustomParser.prop.regexStr + , + + + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbc/src/test/resources/test-input/sample-FileToJdbc.txt ---------------------------------------------------------------------- diff --git a/examples/jdbc/src/test/resources/test-input/sample-FileToJdbc.txt b/examples/jdbc/src/test/resources/test-input/sample-FileToJdbc.txt new file mode 100644 index 0000000..362253e --- /dev/null +++ b/examples/jdbc/src/test/resources/test-input/sample-FileToJdbc.txt @@ -0,0 +1,10 @@ +1,User1,1000 +2,User2,2000 +3,User3,3000 +4,User4,4000 +5,User5,5000 +6,User6,6000 +7,User7,7000 +8,User8,8000 +9,User9,9000 +10,User10,10000 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/.gitignore ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/.gitignore b/examples/jdbcIngest/.gitignore deleted file mode 100644 index b83d222..0000000 --- a/examples/jdbcIngest/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/target/ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/README.md ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/README.md b/examples/jdbcIngest/README.md deleted file mode 100644 index ec01985..0000000 --- a/examples/jdbcIngest/README.md +++ /dev/null @@ -1,65 +0,0 @@ -## Sample mysql implementation - -This project contains two applications to read records from a table in `MySQL`, create POJOs and write them to a file -in the user specified directory in HDFS. - -1. SimpleJdbcToHDFSApp: Reads table records as per given query and emits them as POJOs. -2. PollJdbcToHDFSApp: Reads table records using partitions in parallel fashion also polls for newly **appended** records and emits them as POJOs. - -Follow these steps to run these applications: - -**Step 1**: Update these properties in the file `src/main/resources/META_INF/properties-.xml`: - -| Property Name | Description | -| ------------- | ----------- | -| dt.application..operator.JdbcInput.prop.store.databaseUrl | database URL of the form `jdbc:mysql://hostName:portNumber/dbName` | -| dt.application..operator.JdbcInput.prop.store.userName | MySQL user name | -| dt.application..operator.JdbcInput.prop.store.password | MySQL user password | -| dt.application..operator.FileOutputOperator.filePath | HDFS output directory path | - -**Step 2**: Create database table and add entries - -Go to the MySQL console and run (where _{path}_ is a suitable prefix): - - mysql> source {path}/src/test/resources/example.sql - -After this, please verify that `testDev.test_event_table` is created and has 10 rows: - - mysql> select count(*) from testDev.test_event_table; - +----------+ - | count(*) | - +----------+ - | 10 | - +----------+ - -**Step 3**: Create HDFS output directory if not already present (_{path}_ should be the same as specified in `META_INF/properties-.xml`): - - hadoop fs -mkdir -p {path} - -**Step 4**: Build the code: - - shell> mvn clean install - -Upload the `target/jdbcInput-1.0-SNAPSHOT.apa` to the UI console if available or launch it from -the commandline using `apexcli`. - -**Step 5**: During launch use `src/main/resources/META_INF/properties-.xml` as a custom configuration file; then verify -that the output directory has the expected output: - - shell> hadoop fs -cat /2_op.dat.* | wc -l - -This should return 10 as the count. - -Sample Output: - - hadoop fs -cat /2_op.dat.0 - PojoEvent [accountNumber=1, name=User1, amount=1000] - PojoEvent [accountNumber=2, name=User2, amount=2000] - PojoEvent [accountNumber=3, name=User3, amount=3000] - PojoEvent [accountNumber=4, name=User4, amount=4000] - PojoEvent [accountNumber=5, name=User5, amount=5000] - PojoEvent [accountNumber=6, name=User6, amount=6000] - PojoEvent [accountNumber=7, name=User7, amount=7000] - PojoEvent [accountNumber=8, name=User8, amount=8000] - PojoEvent [accountNumber=9, name=User9, amount=9000] - PojoEvent [accountNumber=10, name=User10, amount=1000] http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl b/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl deleted file mode 100644 index 08075a9..0000000 --- a/examples/jdbcIngest/XmlJavadocCommentsExtractor.xsl +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - - - - - - - - - - - - - - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/pom.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/pom.xml b/examples/jdbcIngest/pom.xml deleted file mode 100644 index f9288b8..0000000 --- a/examples/jdbcIngest/pom.xml +++ /dev/null @@ -1,298 +0,0 @@ - - - 4.0.0 - - com.example - 1.0-SNAPSHOT - jdbcInput - jar - - - JDBC Input Operator - Example Uses of JDBC Input Operator - - - - 3.5.0 - lib/*.jar - 3.6.0 - - - - - - org.apache.maven.plugins - maven-eclipse-plugin - 2.9 - - true - - - - maven-compiler-plugin - 3.3 - - UTF-8 - 1.7 - 1.7 - true - false - true - true - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - prepare-package - - copy-dependencies - - - target/deps - runtime - - - - - - - maven-assembly-plugin - - - app-package-assembly - package - - single - - - ${project.artifactId}-${project.version}-apexapp - false - - src/assemble/appPackage.xml - - - 0755 - - - - ${apex.apppackage.classpath} - ${apex.version} - ${project.groupId} - ${project.artifactId} - ${project.version} - ${project.name} - ${project.description} - - - - - - - - - maven-antrun-plugin - 1.7 - - - package - - - - - - - run - - - - - createJavadocDirectory - generate-resources - - - - - - - - run - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 1.9.1 - - - attach-artifacts - package - - attach-artifact - - - - - target/${project.artifactId}-${project.version}.apa - apa - - - false - - - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - - xml-doclet - generate-resources - - javadoc - - - com.github.markusbernhardt.xmldoclet.XmlDoclet - -d - ${project.build.directory}/generated-resources/xml-javadoc - -filename - ${project.artifactId}-${project.version}-javadoc.xml - false - - com.github.markusbernhardt - xml-doclet - 1.0.4 - - - - - - - - org.codehaus.mojo - xml-maven-plugin - 1.0 - - - transform-xmljavadoc - generate-resources - - transform - - - - - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - XmlJavadocCommentsExtractor.xsl - ${project.build.directory}/generated-resources/xml-javadoc - - - - - - - maven-resources-plugin - 2.6 - - - copy-resources - process-resources - - copy-resources - - - ${basedir}/target/classes - - - ${project.build.directory}/generated-resources/xml-javadoc - - ${project.artifactId}-${project.version}-javadoc.xml - - true - - - - - - - - - - - - - - - org.apache.apex - malhar-library - ${malhar.version} - - - * - * - - - - - org.apache.apex - apex-common - ${apex.version} - provided - - - junit - junit - 4.10 - test - - - org.apache.apex - apex-engine - ${apex.version} - test - - - mysql - mysql-connector-java - 5.1.36 - - - org.jooq - jooq - 3.6.4 - - - org.codehaus.janino - janino - 2.7.8 - - - org.hsqldb - hsqldb - 2.3.1 - - - - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/assemble/appPackage.xml b/examples/jdbcIngest/src/assemble/appPackage.xml deleted file mode 100644 index 7ad071c..0000000 --- a/examples/jdbcIngest/src/assemble/appPackage.xml +++ /dev/null @@ -1,43 +0,0 @@ - - appPackage - - jar - - false - - - ${basedir}/target/ - /app - - ${project.artifactId}-${project.version}.jar - - - - ${basedir}/target/deps - /lib - - - ${basedir}/src/site/conf - /conf - - *.xml - - - - ${basedir}/src/main/resources/META-INF - /META-INF - - - ${basedir}/src/main/resources/app - /app - - - ${basedir}/src/main/resources/resources - /resources - - - - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java deleted file mode 100644 index e155f23..0000000 --- a/examples/jdbcIngest/src/main/java/com/example/mydtapp/FileLineOutputOperator.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.example.mydtapp; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -public class FileLineOutputOperator extends AbstractFileOutputOperator -{ - @Override - protected String getFileName(Object input) - { - return context.getId() + "_" + "op.dat"; - } - - @Override - protected byte[] getBytesForTuple(Object input) - { - return (input.toString() + "\n").getBytes(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java deleted file mode 100644 index 5605bcf..0000000 --- a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcHDFSApp.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.example.mydtapp; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; -import com.datatorrent.lib.db.jdbc.JdbcStore; -import com.datatorrent.lib.util.FieldInfo; -import com.datatorrent.lib.util.FieldInfo.SupportType; - -@ApplicationAnnotation(name = "SimpleJdbcToHDFSApp") -public class JdbcHDFSApp implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator()); - /** - * The class given below can be updated to the user defined class based on - * input table schema The addField infos method needs to be updated - * accordingly This line can be commented and class can be set from the - * properties file - */ - // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class); - - jdbcInputOperator.setFieldInfos(addFieldInfos()); - - JdbcStore store = new JdbcStore(); - jdbcInputOperator.setStore(store); - - FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator()); - - dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL); - } - - /** - * This method can be modified to have field mappings based on used defined - * class - */ - private List addFieldInfos() - { - List fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); - fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); - fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); - return fieldInfos; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java deleted file mode 100644 index 54d71f7..0000000 --- a/examples/jdbcIngest/src/main/java/com/example/mydtapp/JdbcPollerApplication.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.example.mydtapp; - -import java.util.List; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.db.jdbc.JdbcPOJOPollInputOperator; -import com.datatorrent.lib.db.jdbc.JdbcStore; -import com.datatorrent.lib.util.FieldInfo; -import com.datatorrent.lib.util.FieldInfo.SupportType; -import com.google.common.collect.Lists; - -@ApplicationAnnotation(name = "PollJdbcToHDFSApp") -public class JdbcPollerApplication implements StreamingApplication -{ - public void populateDAG(DAG dag, Configuration conf) - { - JdbcPOJOPollInputOperator poller = dag.addOperator("JdbcPoller", new JdbcPOJOPollInputOperator()); - - JdbcStore store = new JdbcStore(); - poller.setStore(store); - - poller.setFieldInfos(addFieldInfos()); - - FileLineOutputOperator writer = dag.addOperator("Writer", new FileLineOutputOperator()); - dag.setInputPortAttribute(writer.input, PortContext.PARTITION_PARALLEL, true); - writer.setRotationWindows(60); - - dag.addStream("dbrecords", poller.outputPort, writer.input); - } - - /** - * This method can be modified to have field mappings based on used defined - * class - */ - private List addFieldInfos() - { - List fieldInfos = Lists.newArrayList(); - fieldInfos.add(new FieldInfo("ACCOUNT_NO", "accountNumber", SupportType.INTEGER)); - fieldInfos.add(new FieldInfo("NAME", "name", SupportType.STRING)); - fieldInfos.add(new FieldInfo("AMOUNT", "amount", SupportType.INTEGER)); - return fieldInfos; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java b/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java deleted file mode 100644 index f56522b..0000000 --- a/examples/jdbcIngest/src/main/java/com/example/mydtapp/PojoEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.example.mydtapp; - -public class PojoEvent -{ - @Override - public String toString() - { - return "PojoEvent [accountNumber=" + accountNumber + ", name=" + name + ", amount=" + amount + "]"; - } - - private int accountNumber; - private String name; - private int amount; - - public int getAccountNumber() - { - return accountNumber; - } - - public void setAccountNumber(int accountNumber) - { - this.accountNumber = accountNumber; - } - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public int getAmount() - { - return amount; - } - - public void setAmount(int amount) - { - this.amount = amount; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml deleted file mode 100644 index 6e7aaf6..0000000 --- a/examples/jdbcIngest/src/main/resources/META-INF/properties-PollJdbcToHDFSApp.xml +++ /dev/null @@ -1,73 +0,0 @@ - - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount - 2 - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver - com.mysql.jdbc.Driver - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl - jdbc:mysql://localhost:3306/testDev - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.userName - root - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.password - mysql - - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.batchSize - 300 - - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key - ACCOUNT_NO - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression - ACCOUNT_NO,NAME,AMOUNT - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS - com.example.mydtapp.PojoEvent - - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName - test_event_table - - - - dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.pollInterval - 1000 - - - - - dt.application.PollJdbcToHDFSApp.operator.Writer.filePath - /tmp/test/output - - - - dt.loggers.level - com.datatorrent.*:DEBUG,org.apache.*:INFO - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml b/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml deleted file mode 100644 index 9fce7f8..0000000 --- a/examples/jdbcIngest/src/main/resources/META-INF/properties-SimpleJdbcToHDFSApp.xml +++ /dev/null @@ -1,66 +0,0 @@ - - - - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseDriver - - org.hsqldb.jdbcDriver - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.store.databaseUrl - - jdbc:hsqldb:mem:test - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.fetchSize - - 50 - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.query - - select * from test_event_table - - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.prop.tableName - - test_event_table - - - - - dt.application.SimpleJdbcToHDFSApp.operator.JdbcInput.port.outputPort.attr.TUPLE_CLASS - - com.example.mydtapp.PojoEvent - - - - - dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.filePath - - /tmp/jdbcApp - - - - dt.application.SimpleJdbcToHDFSApp.operator.FileOutputOperator.rotationWindows - - 5 - - - - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java deleted file mode 100644 index fb78944..0000000 --- a/examples/jdbcIngest/src/test/java/com/example/mydtapp/ApplicationTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.example.mydtapp; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode.
- * The assumption to run this test case is that test_event_table is created - * already - */ -public class ApplicationTest -{ - - @Test - @Ignore - public void testApplication() throws IOException, Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); - lma.prepareDAG(new JdbcHDFSApp(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); // runs for 10 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java deleted file mode 100644 index 1d95f4d..0000000 --- a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcInputAppTest.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.example.mydtapp; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; - -import javax.validation.ConstraintViolationException; - -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * Application test for {@link JdbcHDFSApp} - */ -public class JdbcInputAppTest -{ - private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; - private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - private static final String TABLE_NAME = "test_event_table"; - private static final String FILE_NAME = "/tmp/jdbcApp"; - - @BeforeClass - public static void setup() - { - try { - cleanup(); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - Class.forName(DB_DRIVER).newInstance(); - - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; - stmt.executeUpdate(createTable); - cleanTable(); - insertEventsInTable(10, 0); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @AfterClass - public static void cleanup() - { - try { - FileUtils.deleteDirectory(new File(FILE_NAME)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static void cleanTable() - { - try { - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - String cleanTable = "delete from " + TABLE_NAME; - stmt.executeUpdate(cleanTable); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public static void insertEventsInTable(int numEvents, int offset) - { - try { - Connection con = DriverManager.getConnection(URL); - String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; - PreparedStatement stmt = con.prepareStatement(insert); - for (int i = 0; i < numEvents; i++, offset++) { - stmt.setInt(1, offset); - stmt.setString(2, "Account_Holder-" + offset); - stmt.setInt(3, (offset * 1000)); - stmt.executeUpdate(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testApplication() throws Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-SimpleJdbcToHDFSApp.xml")); - lma.prepareDAG(new JdbcHDFSApp(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - - // wait for output files to roll - Thread.sleep(5000); - - String[] extensions = { "dat.0", "tmp" }; - Collection list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); - Assert.assertEquals("Records in file", 10, FileUtils.readLines(list.iterator().next()).size()); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java b/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java deleted file mode 100644 index b96d4ae..0000000 --- a/examples/jdbcIngest/src/test/java/com/example/mydtapp/JdbcPollerApplicationTest.java +++ /dev/null @@ -1,128 +0,0 @@ -package com.example.mydtapp; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collection; - -import javax.validation.ConstraintViolationException; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; - -public class JdbcPollerApplicationTest -{ - private static final String DB_DRIVER = "org.hsqldb.jdbcDriver"; - private static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; - private static final String TABLE_NAME = "test_event_table"; - private static final String OUTPUT_DIR_NAME = "/tmp/test/output"; - - @BeforeClass - public static void setup() - { - try { - cleanup(); - } catch (Exception e) { - throw new RuntimeException(e); - } - try { - Class.forName(DB_DRIVER).newInstance(); - - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - - String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME - + " (ACCOUNT_NO INTEGER, NAME VARCHAR(255),AMOUNT INTEGER)"; - stmt.executeUpdate(createTable); - cleanTable(); - insertEventsInTable(10, 0); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @AfterClass - public static void cleanup() - { - try { - FileUtils.deleteDirectory(new File(OUTPUT_DIR_NAME)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public static void cleanTable() - { - try { - Connection con = DriverManager.getConnection(URL); - Statement stmt = con.createStatement(); - String cleanTable = "delete from " + TABLE_NAME; - stmt.executeUpdate(cleanTable); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public static void insertEventsInTable(int numEvents, int offset) - { - try { - Connection con = DriverManager.getConnection(URL); - String insert = "insert into " + TABLE_NAME + " values (?,?,?)"; - PreparedStatement stmt = con.prepareStatement(insert); - for (int i = 0; i < numEvents; i++, offset++) { - stmt.setInt(1, offset); - stmt.setString(2, "Account_Holder-" + offset); - stmt.setInt(3, (offset * 1000)); - stmt.executeUpdate(); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testApplication() throws Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseUrl", URL); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.store.databaseDriver", DB_DRIVER); - conf.setInt("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.partitionCount", 2); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.key", "ACCOUNT_NO"); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.columnsExpression", "ACCOUNT_NO,NAME,AMOUNT"); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.prop.tableName", TABLE_NAME); - conf.set("dt.application.PollJdbcToHDFSApp.operator.JdbcPoller.port.outputPort.attr.TUPLE_CLASS", - "com.example.mydtapp.PojoEvent"); - conf.set("dt.application.PollJdbcToHDFSApp.operator.Writer.filePath", OUTPUT_DIR_NAME); - - lma.prepareDAG(new JdbcPollerApplication(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - - // wait for output files to roll - Thread.sleep(5000); - - String[] extensions = { "dat.0", "tmp" }; - Collection list = FileUtils.listFiles(new File(OUTPUT_DIR_NAME), extensions, false); - int recordsCount = 0; - for (File file : list) { - recordsCount += FileUtils.readLines(file).size(); - } - Assert.assertEquals("Records in file", 10, recordsCount); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b99e0ae/examples/jdbcIngest/src/test/resources/example.sql ---------------------------------------------------------------------- diff --git a/examples/jdbcIngest/src/test/resources/example.sql b/examples/jdbcIngest/src/test/resources/example.sql deleted file mode 100644 index 531c659..0000000 --- a/examples/jdbcIngest/src/test/resources/example.sql +++ /dev/null @@ -1,24 +0,0 @@ -DROP DATABASE IF EXISTS testDev; - -CREATE DATABASE testDev; - -USE testDev; - -CREATE TABLE IF NOT EXISTS `test_event_table` ( - `ACCOUNT_NO` int(11) NOT NULL, - `NAME` varchar(255) DEFAULT NULL, - `AMOUNT` int(11) DEFAULT NULL, - primary key(`ACCOUNT_NO`) -) ENGINE=MyISAM DEFAULT CHARSET=latin1; - -INSERT INTO `test_event_table` (`ACCOUNT_NO`, `NAME`, `AMOUNT`) VALUES -(1, 'User1', 1000), -(2, 'User2', 2000), -(3, 'User3', 3000), -(4, 'User4', 4000), -(5, 'User5', 5000), -(6, 'User6', 6000), -(7, 'User7', 7000), -(8, 'User8', 8000), -(9, 'User9', 9000), -(10, 'User10', 1000);