apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chin...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2157 Json formatter improvements
Date Thu, 28 Jul 2016 08:30:30 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master d06b2d987 -> a185fef04


APEXMALHAR-2157 Json formatter improvements


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9e9fe76f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9e9fe76f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9e9fe76f

Branch: refs/heads/master
Commit: 9e9fe76fefb8f5436dfd9998181802d42e98cad6
Parents: aaa4464
Author: shubham <shubham-pathak22@github.com>
Authored: Mon Jul 18 14:46:46 2016 +0530
Committer: shubham <shubham-pathak22@github.com>
Committed: Mon Jul 25 17:04:05 2016 +0530

----------------------------------------------------------------------
 library/pom.xml                                 |   5 +
 .../datatorrent/lib/formatter/Formatter.java    |  40 +++++
 .../lib/formatter/JsonFormatter.java            |  62 ++-----
 .../lib/formatter/JsonFormatterTest.java        | 162 +++++++++++++------
 4 files changed, 167 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index e9f64c8..dae1a38 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -351,6 +351,11 @@
       <version>1.10.73</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+	  <groupId>com.fasterxml.jackson.core</groupId>
+	  <artifactId>jackson-databind</artifactId>
+	  <version>2.5.4</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
index 25c0b96..db8dbc4 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java
@@ -18,6 +18,9 @@
  */
 package com.datatorrent.lib.formatter;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datatorrent.api.AutoMetric;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -48,6 +51,13 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements
Converte
 {
   protected transient Class<?> clazz;
 
+  @AutoMetric
+  private long errorTupleCount;
+  @AutoMetric
+  private long emittedObjectCount;
+  @AutoMetric
+  private long incomingTuplesCount;
+
   @OutputPortFieldAnnotation
   public transient DefaultOutputPort<OUTPUT> out = new DefaultOutputPort<OUTPUT>();
 
@@ -65,17 +75,28 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements
Converte
     @Override
     public void process(Object inputTuple)
     {
+      incomingTuplesCount++;
       OUTPUT tuple = convert(inputTuple);
       if (tuple == null && err.isConnected()) {
+        errorTupleCount++;
         err.emit(inputTuple);
         return;
       }
       if (out.isConnected()) {
+        emittedObjectCount++;
         out.emit(tuple);
       }
     }
   };
 
+  @Override
+  public void beginWindow(long windowId)
+  {
+    errorTupleCount = 0;
+    emittedObjectCount = 0;
+    incomingTuplesCount = 0;
+  }
+
   /**
    * Get the class that needs to be formatted
    * 
@@ -95,4 +116,23 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements
Converte
   {
     this.clazz = clazz;
   }
+
+  @VisibleForTesting
+  protected long getErrorTupleCount()
+  {
+    return errorTupleCount;
+  }
+
+  @VisibleForTesting
+  protected long getEmittedObjectCount()
+  {
+    return emittedObjectCount;
+  }
+
+  @VisibleForTesting
+  protected long getIncomingTuplesCount()
+  {
+    return incomingTuplesCount;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
index fa17fda..840b550 100644
--- a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
+++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java
@@ -16,26 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package com.datatorrent.lib.formatter;
 
-import java.io.IOException;
-import java.text.SimpleDateFormat;
+package com.datatorrent.lib.formatter;
 
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.map.SerializationConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * Operator that converts POJO to JSON string <br>
- * <b>Properties</b> <br>
- * <b>dateFormat</b>: date format e.g dd/MM/yyyy
  * 
  * @displayName JsonFormatter
  * @category Formatter
@@ -45,58 +38,27 @@ import com.datatorrent.netlet.util.DTThrowable;
 @org.apache.hadoop.classification.InterfaceStability.Evolving
 public class JsonFormatter extends Formatter<String>
 {
-  private transient ObjectWriter writer;
-  protected String dateFormat;
+  private transient ObjectMapper objMapper;
 
   @Override
   public void setup(OperatorContext context)
   {
-    try {
-      ObjectMapper mapper = new ObjectMapper();
-      if (dateFormat != null) {
-        mapper.setDateFormat(new SimpleDateFormat(dateFormat));
-      }
-      writer = mapper.writerWithType(clazz);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, true);
-      mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, true);
-    } catch (Throwable e) {
-      throw new RuntimeException("Unable find provided class");
-    }
+    objMapper = new ObjectMapper();
   }
 
   @Override
   public String convert(Object tuple)
   {
+    if (tuple == null) {
+      return null;
+    }
     try {
-      return writer.writeValueAsString(tuple);
-    } catch (JsonGenerationException | JsonMappingException e) {
-      logger.debug("Error while converting tuple {} {}",tuple,e.getMessage());
-    } catch (IOException e) {
-      DTThrowable.rethrow(e);
+      return objMapper.writeValueAsString(tuple);
+    } catch (JsonProcessingException e) {
+      logger.error("Error while converting tuple {} {}", tuple, e);
     }
     return null;
   }
 
-  /**
-   * Get the date format
-   * 
-   * @return Date format string
-   */
-  public String getDateFormat()
-  {
-    return dateFormat;
-  }
-
-  /**
-   * Set the date format
-   * 
-   * @param dateFormat
-   */
-  public void setDateFormat(String dateFormat)
-  {
-    this.dateFormat = dateFormat;
-  }
-
   private static final Logger logger = LoggerFactory.getLogger(JsonFormatter.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9e9fe76f/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
index 98be88c..126639c 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
@@ -22,21 +22,34 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import javax.validation.ConstraintViolationException;
+
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.Description;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
 import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
@@ -99,73 +112,69 @@ public class JsonFormatterTest
   @Test
   public void testJSONToPOJO()
   {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
-
+    Ad pojo = new Ad();
+    pojo.adId = 123;
+    pojo.campaignId = 234876274;
+    pojo.description = "sports";
+    pojo.sizes = Lists.newArrayList("200x350", "600x800");
+    pojo.startDate = new DateTime().withDate(2016, 1, 1).withMillisOfDay(0).withZoneRetainFields(DateTimeZone.UTC)
+        .toDate();
+    pojo.endDate = new DateTime().withDate(2016, 2, 1).withMillisOfDay(0).withZoneRetainFields(DateTimeZone.UTC)
+        .toDate();
     operator.in.put(pojo);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":null}";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
-  }
-
-  @Test
-  public void testJSONToPOJODate()
-  {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = Lists.newArrayList("ABC", "PQR", "XYZ");
-    pojo.date = new DateTime().withYear(2015).withMonthOfYear(9).withDayOfMonth(15).toDate();
-    operator.setDateFormat("dd-MM-yyyy");
-    operator.setup(null);
-    operator.in.put(pojo);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":[\"ABC\",\"PQR\",\"XYZ\"],\"date\":\"15-09-2015\"}";
+    String expectedJSONString = "{\"adId\":123,\"campaignId\":234876274,\"sizes\":[\"200x350\",\"600x800\"],\"startDate\":\"Fri,
1 Jan 2016 00:00:00\",\"endDate\":\"01-Feb-2016\",\"desc\":\"sports\"}";
     Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(1, operator.getEmittedObjectCount());
+    Assert.assertEquals(0, operator.getErrorTupleCount());
   }
 
   @Test
   public void testJSONToPOJONullFields()
   {
-    Test1Pojo pojo = new Test1Pojo();
-    pojo.a = 123;
-    pojo.b = 234876274;
-    pojo.c = "HowAreYou?";
-    pojo.d = null;
+    Ad pojo = new Ad();
+    pojo.adId = 123;
+    pojo.campaignId = 234876274;
+    pojo.description = "sports";
+    pojo.sizes = null;
+    pojo.startDate = null;
+    pojo.endDate = null;
 
     operator.in.put(pojo);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":123,\"b\":234876274,\"c\":\"HowAreYou?\",\"d\":null,\"date\":null}";
+    String expectedJSONString = "{\"adId\":123,\"campaignId\":234876274,\"sizes\":null,\"startDate\":null,\"endDate\":null,\"desc\":\"sports\"}";
     Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(1, operator.getEmittedObjectCount());
+    Assert.assertEquals(0, operator.getErrorTupleCount());
   }
 
   @Test
   public void testJSONToPOJOEmptyPOJO()
   {
-    Test1Pojo pojo = new Test1Pojo();
+    Ad pojo = new Ad();
     operator.in.put(pojo);
     Assert.assertEquals(1, validDataSink.collectedTuples.size());
     Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
-    LOG.debug("{}", validDataSink.collectedTuples.get(0));
+    String expectedJSONString = "{\"adId\":0,\"campaignId\":0,\"sizes\":null,\"startDate\":null,\"endDate\":null,\"desc\":null}";
     Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(1, operator.getEmittedObjectCount());
+    Assert.assertEquals(0, operator.getErrorTupleCount());
   }
 
   @Test
   public void testJSONToPOJONullPOJO()
   {
     operator.in.put(null);
-    Assert.assertEquals(1, validDataSink.collectedTuples.size());
-    Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
-    String expectedJSONString = "null";
-    Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
+    Assert.assertEquals(0, validDataSink.collectedTuples.size());
+    Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(0, operator.getEmittedObjectCount());
+    Assert.assertEquals(1, operator.getErrorTupleCount());
   }
 
   @Test
@@ -173,36 +182,85 @@ public class JsonFormatterTest
   {
     operator.endWindow();
     operator.teardown();
-    operator.setClazz(Test2Pojo.class);
     operator.setup(null);
     operator.beginWindow(1);
 
-    Test2Pojo o = new Test2Pojo();
+    TestPojo o = new TestPojo();
     operator.in.put(o);
     Assert.assertEquals(0, validDataSink.collectedTuples.size());
     Assert.assertEquals(1, invalidDataSink.collectedTuples.size());
     Assert.assertEquals(o, invalidDataSink.collectedTuples.get(0));
+    Assert.assertEquals(1, operator.getIncomingTuplesCount());
+    Assert.assertEquals(0, operator.getEmittedObjectCount());
+    Assert.assertEquals(1, operator.getErrorTupleCount());
+  }
+
+  @Test
+  public void testApplication() throws IOException, Exception
+  {
+    try {
+      LocalMode lma = LocalMode.newInstance();
+      Configuration conf = new Configuration(false);
+      lma.prepareDAG(new JsonFormatterApplication(), conf);
+      LocalMode.Controller lc = lma.getController();
+      lc.run(2000);// runs for 2 seconds and quits
+    } catch (ConstraintViolationException e) {
+      Assert.fail("constraint violations: " + e.getConstraintViolations());
+    }
+  }
+
+  public static class JsonFormatterApplication implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      PojoEmitter input = dag.addOperator("data", new PojoEmitter());
+      JsonFormatter formatter = dag.addOperator("formatter", new JsonFormatter());
+      dag.getMeta(formatter).getMeta(formatter.in).getAttributes().put(Context.PortContext.TUPLE_CLASS,
Ad.class);
+      ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
+      output.setDebug(true);
+      dag.addStream("input", input.output, formatter.in);
+      dag.addStream("output", formatter.out, output.input);
+    }
   }
 
-  public static class Test1Pojo
+  public static class PojoEmitter extends BaseOperator implements InputOperator
   {
-    public int a;
-    public long b;
-    public String c;
-    public List<String> d;
-    public Date date;
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>();
 
     @Override
-    public String toString()
+    public void emitTuples()
     {
-      return "Test1Pojo [a=" + a + ", b=" + b + ", c=" + c + ", d=" + d + ", date=" + date
+ "]";
+      Ad test1Pojo = new Ad();
+      test1Pojo.adId = 1234;
+      test1Pojo.campaignId = 2319483L;
+      test1Pojo.description = "ad";
+      test1Pojo.sizes = new ArrayList<String>();
+      test1Pojo.sizes.add("250x350");
+      test1Pojo.sizes.add("800x600");
+      test1Pojo.startDate = new DateTime().withDate(2016, 1, 1).withHourOfDay(0).withMinuteOfHour(0)
+          .withSecondOfMinute(0).toDate();
+      test1Pojo.endDate = new DateTime().withDate(2016, 2, 1).withHourOfDay(0).withMinuteOfHour(0).withSecondOfMinute(0)
+          .withZone(DateTimeZone.UTC).toDate();
+      output.emit(test1Pojo);
     }
   }
 
-  public static class Test2Pojo
+  public static class Ad
   {
+    public int adId;
+    public long campaignId;
+    @JsonProperty("desc")
+    public String description;
+    public List<String> sizes;
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "EEE, d MMM yyyy HH:mm:ss")
+    public Date startDate;
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MMM-yyyy")
+    public Date endDate;
   }
 
-  private static final Logger LOG = LoggerFactory.getLogger(JsonFormatterTest.class);
+  public static class TestPojo
+  {
+  }
 
 }


Mime
View raw message