apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chin...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2152 - FSLoader fixed length support
Date Thu, 08 Sep 2016 12:05:38 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 99f89731b -> 4c651f990


APEXMALHAR-2152 - FSLoader fixed length support


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/faf7a607
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/faf7a607
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/faf7a607

Branch: refs/heads/master
Commit: faf7a60744e6c0b24e5e9c455e5e777747eb582b
Parents: 2e47b4c
Author: shubham <shubham-pathak22@github.com>
Authored: Tue Aug 23 14:33:12 2016 +0530
Committer: shubham <shubham-pathak22@github.com>
Committed: Wed Aug 31 15:25:45 2016 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../contrib/enrich/FixedWidthFSLoader.java      | 270 +++++++++++++++++++
 .../contrib/enrich/FileEnrichmentTest.java      |  69 ++++-
 .../src/test/resources/fixed-width-sample.txt   |   6 +
 4 files changed, 350 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 70a615c..84a7e05 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -658,5 +658,11 @@
       <version>1.8.0.7</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.univocity</groupId>
+      <artifactId>univocity-parsers</artifactId>
+      <version>2.0.0</version>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
new file mode 100644
index 0000000..8b7eac0
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.univocity.parsers.fixed.FixedWidthFields;
+import com.univocity.parsers.fixed.FixedWidthParser;
+import com.univocity.parsers.fixed.FixedWidthParserSettings;
+
+import com.datatorrent.contrib.parser.AbstractCsvParser.FIELD_TYPE;
+import com.datatorrent.contrib.parser.AbstractCsvParser.Field;
+
+/**
+ * This implementation of {@link FSLoader} is used to load data from fixed width
+ * file.User needs to set {@link FixedWidthFSLoader#fieldDescription} to specify
+ * field information.
+ */
+@InterfaceStability.Evolving
+public class FixedWidthFSLoader extends FSLoader
+{
+
+  private transient List<FixedWidthField> fields;
+  /**
+   * Indicates whether first line of the file is a header. Default is false
+   */
+  private boolean hasHeader;
+
+  /**
+   * Specifies information related to fields in fixed-width file. Format is
+   * [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if FIELD_TYPE is DATE] FIELD_TYPE
+   * can take on of the values of {@link FIELD_TYPE} i.e BOOLEAN, DOUBLE,
+   * INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE e.g.
+   * Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40,
+   * Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\". Date format needs to be within
+   * quotes (" ")
+   */
+  @NotNull
+  private String fieldDescription;
+
+  /**
+   * Array containing headers
+   */
+  private transient String[] header;
+  /**
+   * Padding character. Default is white space.
+   */
+  private char padding = ' ';
+  private transient FixedWidthParser fixedWidthParser;
+  private transient boolean initialized;
+
+  private static final Logger logger = LoggerFactory.getLogger(FixedWidthFSLoader.class);
+
+  /**
+   * Gets the option if file has header or not.
+   *
+   * @return hasHeader,indicating whether first line of the file is a header.
+   */
+  public boolean isHasHeader()
+  {
+    return hasHeader;
+  }
+
+  /**
+   * Set to true if file has header
+   * 
+   * @param hasHeader
+   *          Indicates whether first line of the file is a header. Default is
+   *          false
+   */
+  public void setHasHeader(boolean hasHeader)
+  {
+    this.hasHeader = hasHeader;
+  }
+
+  /**
+   * Gets the field description
+   * 
+   * @return fieldDescription. String specifying information related to fields
+   *         in fixed-width file.
+   */
+  public String getFieldDescription()
+  {
+    return fieldDescription;
+  }
+
+  /**
+   * Sets fieldDescription
+   * 
+   * @param fieldDescription
+   *          a String specifying information related to fields in fixed-width
+   *          file. Format is [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if
+   *          FIELD_TYPE is DATE] FIELD_TYPE can take on of the values of
+   *          {@link FIELD_TYPE}
+   *          e.g.Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:
+   *          STRING:40, Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\" Date format
+   *          needs to be within quotes (" ")
+   */
+  public void setFieldDescription(String fieldDescription)
+  {
+    this.fieldDescription = fieldDescription;
+  }
+
+  /**
+   * Gets the character used for padding in the fixed-width file.Default is
+   * white space (' ')
+   * 
+   * @return Padding character. Default is white space.
+   */
+  public char getPadding()
+  {
+    return padding;
+  }
+
+  /**
+   * Sets the character used for padding in fixed-width file.Default is white
+   * space (' ')
+   * 
+   * @param padding
+   *          Padding character. Default is white space.
+   */
+  public void setPadding(char padding)
+  {
+    this.padding = padding;
+  }
+
+  public static class FixedWidthField extends Field
+  {
+    int width;
+    String dateFormat;
+
+    public int getWidth()
+    {
+      return width;
+    }
+
+    public void setWidth(int width)
+    {
+      this.width = width;
+    }
+
+    public String getDateFormat()
+    {
+      return dateFormat;
+    }
+
+    public void setDateFormat(String dateFormat)
+    {
+      this.dateFormat = dateFormat;
+    }
+
+  }
+
+  /**
+   * Extracts the fields from a fixed width record and returns a map containing
+   * field names and values
+   */
+  @Override
+  Map<String, Object> extractFields(String line)
+  {
+    if (!initialized) {
+      init();
+      initialized = true;
+    }
+    String[] values = fixedWidthParser.parseLine(line);
+    if (hasHeader && Arrays.deepEquals(values, header)) {
+      return null;
+    }
+    Map<String, Object> map = Maps.newHashMap();
+    int i = 0;
+    for (FixedWidthField field : fields) {
+      map.put(field.getName(), getValue(field, values[i++]));
+    }
+    return map;
+  }
+
+  private void init()
+  {
+    fields = new ArrayList<FixedWidthField>();
+    List<String> headers = new ArrayList<String>();
+    List<Integer> fieldWidth = new ArrayList<Integer>();
+    for (String tmp : fieldDescription.split(",")) {
+      String[] fieldTuple = tmp.split(":(?=([^\"]*\"[^\"]*\")*[^\"]*$)", -1);
+      FixedWidthField field = new FixedWidthField();
+      field.setName(fieldTuple[0]);
+      field.setType(fieldTuple[1]);
+      field.setWidth(Integer.parseInt(fieldTuple[2]));
+      headers.add(fieldTuple[0]);
+      fieldWidth.add(Integer.parseInt(fieldTuple[2]));
+      if (field.getType() == FIELD_TYPE.DATE) {
+        if (fieldTuple.length > 3) {
+          field.setDateFormat(fieldTuple[3].replace("\"", ""));
+        } else {
+          logger.error("Date format is missing for the field {}", field.getName());
+          throw new RuntimeException("Missing date format");
+        }
+      }
+      fields.add(field);
+    }
+    header = headers.toArray(new String[headers.size()]);
+    int[] width = Ints.toArray(fieldWidth);
+    FixedWidthFields lengths = new FixedWidthFields(header, width);
+    FixedWidthParserSettings settings = new FixedWidthParserSettings(lengths);
+    settings.getFormat().setPadding(this.padding);
+    fixedWidthParser = new FixedWidthParser(settings);
+  }
+
+  private Object getValue(FixedWidthField field, String value)
+  {
+    if (StringUtils.isEmpty(value)) {
+      return null;
+    }
+    switch (field.getType()) {
+      case BOOLEAN:
+        return Boolean.parseBoolean(value);
+      case DOUBLE:
+        return Double.parseDouble(value);
+      case INTEGER:
+        return Integer.parseInt(value);
+      case FLOAT:
+        return Float.parseFloat(value);
+      case LONG:
+        return Long.parseLong(value);
+      case SHORT:
+        return Short.parseShort(value);
+      case CHARACTER:
+        return value.charAt(0);
+      case DATE:
+        try {
+          return new SimpleDateFormat(field.getDateFormat()).parse(value);
+        } catch (ParseException e) {
+          logger.error("Error parsing date for format {} and value {}", field.getDateFormat(),
value);
+          throw new RuntimeException(e);
+        }
+      default:
+        return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
index 56f9c7f..d12cdae 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.contrib.enrich;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.Map;
@@ -34,6 +33,7 @@ import org.apache.commons.io.FileUtils;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.google.common.collect.Maps;
+
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -166,4 +166,71 @@ public class FileEnrichmentTest
     Assert.assertTrue(emitted.get("mfgDate") instanceof Date);
   }
 
+  @Test
+  public void testEnrichmentOperatorFixedWidthFSLoader() throws IOException, InterruptedException
+  {
+    URL origUrl = this.getClass().getResource("/fixed-width-sample.txt");
+    MapEnricher oper = new MapEnricher();
+    FixedWidthFSLoader store = new FixedWidthFSLoader();
+    store.setFieldDescription(
+        "Year:INTEGER:4,Make:STRING:5,Model:STRING:40,Description:STRING:40,Price:DOUBLE:8,Date:DATE:10:\"dd:mm:yyyy\"");
+    store.setHasHeader(true);
+    store.setPadding('_');
+    store.setFileName(origUrl.toString());
+    oper.setLookupFields(Arrays.asList("Year"));
+    oper.setIncludeFields(Arrays.asList("Year", "Make", "Model", "Price", "Date"));
+    oper.setStore(store);
+
+    oper.setup(null);
+
+    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.output.setSink(tmp);
+
+    oper.activate(null);
+
+    oper.beginWindow(0);
+    Map<String, Object> tuple = Maps.newHashMap();
+    tuple.put("Year", 1997);
+
+    Kryo kryo = new Kryo();
+    oper.input.process(kryo.copy(tuple));
+
+    oper.endWindow();
+
+    oper.deactivate();
+    oper.teardown();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+    /* The fields present in original event is kept as it is */
+    Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size());
+    Assert.assertEquals("Value of Year is 1997", tuple.get("Year"), emitted.get("Year"));
+
+    /* Check if Make is added to the event */
+    Assert.assertEquals("Make is part of tuple", true, emitted.containsKey("Make"));
+    Assert.assertEquals("Value of Make", "Ford", emitted.get("Make"));
+
+    /* Check if Model is added to the event */
+    Assert.assertEquals("Model is part of tuple", true, emitted.containsKey("Model"));
+    Assert.assertEquals("Value of Model", "E350", emitted.get("Model"));
+
+    /* Check if Price is added to the event */
+    Assert.assertEquals("Price is part of tuple", true, emitted.containsKey("Price"));
+    Assert.assertEquals("Value of Price is 3000", 3000.0, emitted.get("Price"));
+    Assert.assertTrue(emitted.get("Price") instanceof Double);
+
+    /* Check if Date is added to the event */
+    Assert.assertEquals("Date is part of tuple", true, emitted.containsKey("Date"));
+    Date mfgDate = (Date)emitted.get("Date");
+    Assert.assertEquals("value of day", 1, mfgDate.getDate());
+    Assert.assertEquals("value of month", 0, mfgDate.getMonth());
+    Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900);
+    Assert.assertTrue(emitted.get("Date") instanceof Date);
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/faf7a607/contrib/src/test/resources/fixed-width-sample.txt
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/fixed-width-sample.txt b/contrib/src/test/resources/fixed-width-sample.txt
new file mode 100644
index 0000000..cd1b7b3
--- /dev/null
+++ b/contrib/src/test/resources/fixed-width-sample.txt
@@ -0,0 +1,6 @@
+YearMake_Model___________________________________Description_____________________________Price___Date______
+1997Ford_E350____________________________________ac, abs, moon___________________________3000.00_01:01:2016
+1999ChevyVenture "Extended Edition"______________________________________________________4900.00_01:01:2016
+1996Jeep_Grand Cherokee__________________________MUST SELL!air, moon roof, loaded_______
4799.00_01:01:2016
+1999ChevyVenture "Extended Edition, Very Large"__________________________________________5000.00_01:01:2016
+_________Venture "Extended Edition"______________________________________________________4900.00_01:01:2016
\ No newline at end of file


Mime
View raw message