apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/4] apex-malhar git commit: APEXMALHAR-1818 SQL Support for converting given SQL statement to APEX DAG.
Date Fri, 21 Oct 2016 16:58:26 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master a05980579 -> c92ca15e8


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java
new file mode 100644
index 0000000..62d2a4d
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/SerDeTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.junit.Test;
+
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.table.CSVMessageFormat;
+import org.apache.apex.malhar.sql.table.Endpoint;
+import org.apache.apex.malhar.sql.table.FileEndpoint;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+
+public class SerDeTest
+{
+  @Test
+  public void testSQLWithApexFactory() throws IOException, ClassNotFoundException
+  {
+    File modelFile = new File("src/test/resources/model/model_file_csv.json");
+    String model = FileUtils.readFileToString(modelFile);
+
+    LogicalPlan dag = new LogicalPlan();
+    SQLExecEnvironment.getEnvironment()
+        .withModel(model)
+        .executeSQL(dag, "SELECT STREAM ROWTIME, PRODUCT FROM ORDERS");
+
+    dag.validate();
+  }
+
+  @Test
+  public void testSQLWithAPI() throws ClassNotFoundException, IOException
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    String schema = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},"
+
+        "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+
+    Endpoint endpoint = new FileEndpoint("dummyFilePath", new CSVMessageFormat(schema));
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable("ORDERS", endpoint)
+        .executeSQL(dag, "SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT, 0, 5)
FROM ORDERS WHERE id > 3");
+
+    dag.validate();
+  }
+
+  @Test
+  public void testSQLSelectInsertWithAPI() throws IOException, ClassNotFoundException
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss\"}}," +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+    String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss\"}}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable("ORDERS", new FileEndpoint("dummyFilePathInput", new CSVMessageFormat(schemaIn)))
+        .registerTable("SALES", new FileEndpoint("dummyFilePathOutput", "out.tmp", new CSVMessageFormat(schemaOut)))
+        .executeSQL(dag, "INSERT INTO SALES SELECT STREAM FLOOR(ROWTIME TO HOUR), SUBSTRING(PRODUCT,
0, 5) " +
+        "FROM ORDERS WHERE id > 3");
+
+    dag.validate();
+  }
+
+  @Test
+  public void testJoin() throws IOException, ClassNotFoundException
+  {
+    LogicalPlan dag = new LogicalPlan();
+    String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+    String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Category\",\"type\":\"String\"}]}";
+    String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"Category\",\"type\":\"String\"}]}";
+
+    String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+        "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+        "FROM ORDERS AS A " +
+        "JOIN CATEGORY AS B ON A.id = B.id " +
+        "WHERE A.id > 3 AND A.PRODUCT LIKE 'paint%'";
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat(schemaIn0)))
+        .registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat(schemaIn1)))
+        .registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat(schemaOut)))
+        .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+        .executeSQL(dag, sql);
+
+    dag.validate();
+  }
+
+  @Test
+  public void testJoinFilter() throws IOException, ClassNotFoundException
+  {
+    LogicalPlan dag = new LogicalPlan();
+    String schemaIn0 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+    String schemaIn1 = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Category\",\"type\":\"String\"}]}";
+    String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"Category\",\"type\":\"String\"}]}";
+
+    String sql = "INSERT INTO SALES SELECT STREAM A.ROWTIME, FLOOR(A.ROWTIME TO DAY), " +
+        "APEXCONCAT('OILPAINT', SUBSTRING(A.PRODUCT, 6, 7)), B.CATEGORY " +
+        "FROM ORDERS AS A JOIN CATEGORY AS B ON A.id = B.id AND A.id > 3" +
+        "WHERE A.PRODUCT LIKE 'paint%'";
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable("ORDERS", new KafkaEndpoint("localhost:9092", "testdata0", new CSVMessageFormat(schemaIn0)))
+        .registerTable("CATEGORY", new KafkaEndpoint("localhost:9092", "testdata1", new CSVMessageFormat(schemaIn1)))
+        .registerTable("SALES", new KafkaEndpoint("localhost:9092", "testresult", new CSVMessageFormat(schemaOut)))
+        .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+        .executeSQL(dag, sql);
+
+    dag.validate();
+  }
+
+  @Test
+  public void testPortEndpoint() throws IOException, ClassNotFoundException
+  {
+    LogicalPlan dag = new LogicalPlan();
+
+    String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"id\",\"type\":\"Integer\"}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}," +
+        "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+    String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+        "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss Z\"}}," +
+        "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+    KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+    kafkaInput.setTopics("testdata0");
+    kafkaInput.setInitialOffset("EARLIEST");
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER);
+    kafkaInput.setConsumerProps(props);
+    kafkaInput.setClusters("localhost:9092");
+
+    CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+    csvParser.setSchema(schemaIn);
+
+    dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+    CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class);
+    formatter.setSchema(schemaOut);
+
+    KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class);
+    kafkaOutput.setTopic("testresult");
+
+    props = new Properties();
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER);
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    kafkaOutput.setProperties(props);
+
+    dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort);
+
+    SQLExecEnvironment.getEnvironment()
+        .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class))
+        .registerTable("SALES", new StreamEndpoint(formatter.in, OutputPOJO.class))
+        .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+        .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME
TO DAY), " +
+        "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID >
3 " + "AND " +
+        "PRODUCT LIKE 'paint%'");
+
+    dag.validate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java b/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java
new file mode 100644
index 0000000..1e9a1f8
--- /dev/null
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/StreamEndpointTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sql;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.TimeZone;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.apex.malhar.kafka.EmbeddedKafka;
+import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
+import org.apache.apex.malhar.kafka.KafkaSinglePortOutputOperator;
+import org.apache.apex.malhar.sql.table.KafkaEndpoint;
+import org.apache.apex.malhar.sql.table.StreamEndpoint;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import com.google.common.collect.ImmutableMap;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.contrib.formatter.CsvFormatter;
+import com.datatorrent.contrib.parser.CsvParser;
+
+public class StreamEndpointTest
+{
+  private String testTopicData0 = "dataTopic0";
+  private String testTopicResult = "resultTopic";
+
+  private EmbeddedKafka kafka;
+
+  private TimeZone defaultTZ;
+
+  @Before
+  public void setup() throws IOException
+  {
+    defaultTZ = TimeZone.getDefault();
+    TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
+
+    kafka = new EmbeddedKafka();
+    kafka.start();
+    kafka.createTopic(testTopicData0);
+    kafka.createTopic(testTopicResult);
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    kafka.stop();
+
+    TimeZone.setDefault(defaultTZ);
+  }
+
+  @Test
+  public void testApplicationWithPortEndpoint()
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new KafkaPortApplication(kafka.getBroker(), testTopicData0, testTopicResult),
conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.runAsync();
+
+      kafka.publish(testTopicData0, Arrays.asList("15/02/2016 10:15:00 +0000,1,paint1,11",
+          "15/02/2016 10:16:00 +0000,2,paint2,12",
+          "15/02/2016 10:17:00 +0000,3,paint3,13", "15/02/2016 10:18:00 +0000,4,paint4,14",
+          "15/02/2016 10:19:00 +0000,5,paint5,15", "15/02/2016 10:10:00 +0000,6,abcde6,16"));
+
+      // TODO: Workaround to add \r\n char to test results because of bug in CsvFormatter
which adds new line char.
+      String[] expectedLines = new String[] {"15/02/2016 10:18:00 +0000,15/02/2016 00:00:00
+0000,OILPAINT4\r\n",
+          "15/02/2016 10:19:00 +0000,15/02/2016 00:00:00 +0000,OILPAINT5\r\n"};
+
+      List<String> consume = kafka.consume(testTopicResult, 30000);
+
+      Assert.assertTrue(Arrays.deepEquals(consume.toArray(new String[consume.size()]), expectedLines));
+
+      lc.shutdown();
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    } catch (Exception e) {
+      Assert.fail("Exception: " + e);
+    }
+  }
+
+  public static class KafkaPortApplication implements StreamingApplication
+  {
+    private String broker;
+    private String sourceTopic;
+    private String destTopic;
+
+    public KafkaPortApplication(String broker, String sourceTopic, String destTopic)
+    {
+      this.broker = broker;
+      this.sourceTopic = sourceTopic;
+      this.destTopic = destTopic;
+    }
+
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      String schemaIn = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
HH:mm:ss Z\"}}," +
+          "{\"name\":\"id\",\"type\":\"Integer\"}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}," +
+          "{\"name\":\"units\",\"type\":\"Integer\"}]}";
+      String schemaOut = "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[" +
+          "{\"name\":\"RowTime1\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
HH:mm:ss Z\"}}," +
+          "{\"name\":\"RowTime2\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
HH:mm:ss Z\"}}," +
+          "{\"name\":\"Product\",\"type\":\"String\"}]}";
+
+      KafkaSinglePortInputOperator kafkaInput = dag.addOperator("KafkaInput", KafkaSinglePortInputOperator.class);
+      kafkaInput.setTopics(sourceTopic);
+      kafkaInput.setInitialOffset("EARLIEST");
+      Properties props = new Properties();
+      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_DESERIALIZER);
+      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_DESERIALIZER);
+      kafkaInput.setConsumerProps(props);
+      kafkaInput.setClusters(broker);
+
+      CsvParser csvParser = dag.addOperator("CSVParser", CsvParser.class);
+      csvParser.setSchema(schemaIn);
+
+      dag.addStream("KafkaToCSV", kafkaInput.outputPort, csvParser.in);
+
+      CsvFormatter formatter = dag.addOperator("CSVFormatter", CsvFormatter.class);
+      formatter.setSchema(schemaOut);
+
+      KafkaSinglePortOutputOperator kafkaOutput = dag.addOperator("KafkaOutput", KafkaSinglePortOutputOperator.class);
+      kafkaOutput.setTopic(destTopic);
+
+      props = new Properties();
+      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.VALUE_SERIALIZER);
+      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaEndpoint.KEY_SERIALIZER);
+      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+      kafkaOutput.setProperties(props);
+
+      dag.addStream("CSVToKafka", formatter.out, kafkaOutput.inputPort);
+
+      SQLExecEnvironment.getEnvironment()
+          .registerTable("ORDERS", new StreamEndpoint(csvParser.out, InputPOJO.class))
+          .registerTable("SALES", new StreamEndpoint(formatter.in,
+          ImmutableMap.<String, Class>of("RowTime1", Date.class, "RowTime2", Date.class,
"Product", String.class)))
+          .registerFunction("APEXCONCAT", FileEndpointTest.class, "apex_concat_str")
+          .executeSQL(dag, "INSERT INTO SALES " + "SELECT STREAM ROWTIME, " + "FLOOR(ROWTIME
TO DAY), " +
+          "APEXCONCAT('OILPAINT', SUBSTRING(PRODUCT, 6, 7)) " + "FROM ORDERS WHERE ID >
3 " + "AND " +
+          "PRODUCT LIKE 'paint%'");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java
b/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java
index 8fcb7f8..b5cd378 100644
--- a/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java
+++ b/sql/src/test/java/org/apache/apex/malhar/sql/codegen/BeanClassGeneratorTest.java
@@ -1,6 +1,20 @@
 /**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.apex.malhar.sql.codegen;
 
@@ -17,6 +31,7 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.utils.ClassLoaderUtils;
 import org.apache.apex.malhar.sql.schema.TupleSchemaRegistry;
 
 import static org.junit.Assert.assertEquals;
@@ -60,7 +75,7 @@ public class BeanClassGeneratorTest
 
     byte[] beanClass = BeanClassGenerator.createAndWriteBeanClass(addressClassName, schema.fieldList);
 
-    Class<?> clazz = BeanClassGenerator.readBeanClass(addressClassName, beanClass);
+    Class<?> clazz = ClassLoaderUtils.readBeanClass(addressClassName, beanClass);
 
     Object o = clazz.newInstance();
     Field f = clazz.getDeclaredField("streetNumber");
@@ -91,7 +106,7 @@ public class BeanClassGeneratorTest
 
     byte[] beanClass = BeanClassGenerator.createAndWriteBeanClass(addressClassName, schema.fieldList);
 
-    Class<?> clazz = BeanClassGenerator.readBeanClass(addressClassName, beanClass);
+    Class<?> clazz = ClassLoaderUtils.readBeanClass(addressClassName, beanClass);
 
     Object o = clazz.newInstance();
     Field f = clazz.getDeclaredField("streetNumber");

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/input.csv
----------------------------------------------------------------------
diff --git a/sql/src/test/resources/input.csv b/sql/src/test/resources/input.csv
new file mode 100644
index 0000000..c4786d1
--- /dev/null
+++ b/sql/src/test/resources/input.csv
@@ -0,0 +1,6 @@
+15/02/2016 10:15:00 +0000,1,paint1,11
+15/02/2016 10:16:00 +0000,2,paint2,12
+15/02/2016 10:17:00 +0000,3,paint3,13
+15/02/2016 10:18:00 +0000,4,paint4,14
+15/02/2016 10:19:00 +0000,5,paint5,15
+15/02/2016 10:10:00 +0000,6,abcde6,16

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/src/test/resources/log4j.properties b/sql/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8ea3cfe
--- /dev/null
+++ b/sql/src/test/resources/log4j.properties
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=WARN
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=info
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apache.calcite=WARN
+log4j.logger.org.apache.kafka=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.kafka.consumer=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c92ca15e/sql/src/test/resources/model/model_file_csv.json
----------------------------------------------------------------------
diff --git a/sql/src/test/resources/model/model_file_csv.json b/sql/src/test/resources/model/model_file_csv.json
new file mode 100644
index 0000000..beba18d
--- /dev/null
+++ b/sql/src/test/resources/model/model_file_csv.json
@@ -0,0 +1,27 @@
+{
+  "version": "1.0",
+  "defaultSchema": "APEX",
+  "schemas": [{
+    "name": "APEX",
+    "tables": [
+      {
+        "name": "ORDERS",
+        "type": "custom",
+        "factory": "org.apache.apex.malhar.sql.schema.ApexSQLTableFactory",
+        "stream": {
+        "stream": true
+        },
+        "operand": {
+          "endpoint": "file",
+          "messageFormat": "csv",
+          "endpointOperands": {
+            "directory": "src/test/resources/input.csv"
+          },
+          "messageFormatOperands": {
+            "schema": "{\"separator\":\",\",\"quoteChar\":\"\\\"\",\"fields\":[{\"name\":\"RowTime\",\"type\":\"Date\",\"constraints\":{\"format\":\"dd/MM/yyyy
hh:mm:ss\"}},{\"name\":\"id\",\"type\":\"Integer\"},{\"name\":\"Product\",\"type\":\"String\"},{\"name\":\"units\",\"type\":\"Integer\"}]}"
+          }
+        }
+      }
+    ]
+  }]
+}


Mime
View raw message