Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D5433200B68 for ; Fri, 19 Aug 2016 10:27:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D3B11160AAC; Fri, 19 Aug 2016 08:27:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A42A0160A8E for ; Fri, 19 Aug 2016 10:27:45 +0200 (CEST) Received: (qmail 65570 invoked by uid 500); 19 Aug 2016 08:27:44 -0000 Mailing-List: contact commits-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list commits@apex.apache.org Received: (qmail 65534 invoked by uid 99); 19 Aug 2016 08:27:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2016 08:27:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B5708DFE2E; Fri, 19 Aug 2016 08:27:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chinmay@apache.org To: commits@apex.apache.org Message-Id: <5f3c339722184e81850d509cfd8430ce@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: apex-malhar git commit: APEXMALHAR-2151 delimited file format support to FSLoader Date: Fri, 19 Aug 2016 08:27:44 +0000 (UTC) archived-at: Fri, 19 Aug 2016 08:27:47 -0000 Repository: apex-malhar Updated Branches: refs/heads/master 9b6e11d85 -> 571db6c06 APEXMALHAR-2151 delimited file format support to FSLoader Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/571db6c0 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/571db6c0 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/571db6c0 Branch: refs/heads/master Commit: 571db6c063a0cd34d9379148f3eada85b16a0760 Parents: 9b6e11d Author: shubham Authored: Wed Aug 10 14:13:10 2016 +0530 Committer: shubham Committed: Thu Aug 18 14:05:30 2016 +0530 ---------------------------------------------------------------------- .../contrib/enrich/DelimitedFSLoader.java | 165 +++++++++++++++++++ .../datatorrent/contrib/enrich/FSLoader.java | 52 +++--- .../contrib/enrich/JsonFSLoader.java | 74 +++++++++ .../contrib/enrich/FileEnrichmentTest.java | 70 +++++++- .../src/test/resources/productmapping-delim.txt | 100 +++++++++++ 5 files changed, 431 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java new file mode 100644 index 0000000..25f283c --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.cellprocessor.ift.CellProcessor; +import org.supercsv.io.CsvMapReader; +import org.supercsv.prefs.CsvPreference; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.contrib.parser.CellProcessorBuilder; +import com.datatorrent.contrib.parser.DelimitedSchema; +import com.datatorrent.contrib.parser.DelimitedSchema.Field; +import com.datatorrent.lib.util.ReusableStringReader; + +/** + * This implementation of {@link FSLoader} is used to load data from delimited + * file.User needs to provide a schema as a string specified in a json format as + * per {@link DelimitedSchema} that contains information of name and type of + * field + */ +@InterfaceStability.Evolving +public class DelimitedFSLoader extends FSLoader +{ + + /** + * Map Reader to read delimited records + */ + private transient CsvMapReader csvMapReader; + /** + * Reader used by csvMapReader + */ + private transient ReusableStringReader csvStringReader; + /** + * Contents of the schema.Schema is specified in a json format as per + * {@link DelimitedSchema} + */ + @NotNull + private String schema; + /** + * Schema is read into this object to access fields + */ + private transient DelimitedSchema delimitedParserSchema; + /** + * Cell processors are an integral part of reading and writing with Super CSV + * they automate the data type conversions, and enforce constraints. + */ + private transient CellProcessor[] processors; + /** + * Names of all the fields in the same order of incoming records + */ + private transient String[] nameMapping; + /** + * header-this will be delimiter separated string of field names + */ + private transient String header; + /** + * Reading preferences that are passed through schema + */ + private transient CsvPreference preference; + + private boolean initialized; + + private static final Logger logger = LoggerFactory.getLogger(DelimitedFSLoader.class); + + public DelimitedFSLoader() + { + } + + /** + * Extracts the fields from a delimited record and returns a map containing + * field names and values + */ + @Override + Map extractFields(String line) + { + if (!initialized) { + init(); + initialized = true; + } + if (StringUtils.isBlank(line) || StringUtils.equals(line, header)) { + return null; + } + try { + csvStringReader.open(line); + return csvMapReader.read(nameMapping, processors); + } catch (IOException e) { + logger.error("Error parsing line{} Exception {}", line, e.getMessage()); + return null; + } + } + + private void init() + { + + delimitedParserSchema = new DelimitedSchema(schema); + preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(), + delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build(); + nameMapping = delimitedParserSchema.getFieldNames() + .toArray(new String[delimitedParserSchema.getFieldNames().size()]); + header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + ""); + processors = getProcessor(delimitedParserSchema.getFields()); + csvStringReader = new ReusableStringReader(); + csvMapReader = new CsvMapReader(csvStringReader, preference); + } + + /** + * Returns array of cellprocessors, one for each field + */ + private CellProcessor[] getProcessor(List fields) + { + CellProcessor[] processor = new CellProcessor[fields.size()]; + int fieldCount = 0; + for (Field field : fields) { + processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints()); + } + return processor; + } + + /** + * Get the schema + * + * @return + */ + public String getSchema() + { + return schema; + } + + /** + * Set the schema + * + * @param schema + */ + public void setSchema(String schema) + { + this.schema = schema; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java index 0c23c62..997243d 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java @@ -25,10 +25,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectReader; -import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,30 +40,18 @@ import com.google.common.collect.Maps; import com.datatorrent.lib.db.cache.CacheManager; import com.datatorrent.lib.util.FieldInfo; - /** - * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries - * from the cache. - * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()} - * periodically to reload the file. - *

- * The format of the input file is: - *

- * {"productCategory": 5, "productId": 0} - * {"productCategory": 4, "productId": 1} - * {"productCategory": 5, "productId": 2} - * {"productCategory": 5, "productId": 3} - *

- * Each line in the input file should be a valid json object which represents a record and each key/value pair in that - * json object represents the fields/value. - *

- * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of - * which the memory consumption may go up. + * This implementation of {@link BackendLoader} loads the data from a given file + * into memory cache and serves queries from the cache. When this is set as + * primaryCache in {@link CacheManager}, CacheManager can call + * {@link #loadInitialData()} periodically to reload the file. NOTE: This loader + * should be used with caution as all the data present in the file is loaded in + * memory because of which the memory consumption may go up. * * @since 3.4.0 */ @InterfaceStability.Evolving -public class FSLoader extends ReadOnlyBackup +public abstract class FSLoader extends ReadOnlyBackup { @NotNull private String fileName; @@ -76,8 +60,6 @@ public class FSLoader extends ReadOnlyBackup private transient FileSystem fs; private transient boolean connected; - private static final ObjectMapper mapper = new ObjectMapper(); - private static final ObjectReader reader = mapper.reader(new TypeReference>(){}); private static final Logger logger = LoggerFactory.getLogger(FSLoader.class); public String getFileName() @@ -103,9 +85,11 @@ public class FSLoader extends ReadOnlyBackup String line; while ((line = bin.readLine()) != null) { try { - Map tuple = reader.readValue(line); - result.put(getKey(tuple), getValue(tuple)); - } catch (JsonProcessingException parseExp) { + Map tuple = extractFields(line); + if (tuple != null && !tuple.isEmpty()) { + result.put(getKey(tuple), getValue(tuple)); + } + } catch (Exception parseExp) { logger.info("Unable to parse line {}", line); } } @@ -128,6 +112,18 @@ public class FSLoader extends ReadOnlyBackup return result; } + /** + * This method is called by {@link #loadInitialData()} to extract values from + * a record. Concrete implementations override this method to parse a record + * and convert it to Map of field names and values OR simply returns null to + * skip the records. + * + * @param line + * A single record from file + * @return a map with field name and value. Null value if returned is ignored + */ + abstract Map extractFields(String line); + private Object getValue(Map tuple) { ArrayList includeTuple = new ArrayList(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java new file mode 100644 index 0000000..a1d139a --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.contrib.enrich; + +import java.io.IOException; +import java.util.Map; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This implementation of {@link FSLoader} is used to load data from json file. + *

+ * The input file needs to have one Json per line. E.g: + *

+ * {"productCategory": 5, "productId": 0} + * {"productCategory": 4, "productId": 1} + * {"productCategory": 5, "productId": 2} + * {"productCategory": 5, "productId": 3} + *

+ * Each line in the input file should be a valid json object which represents a + * record and each key/value pair in that json object represents the + * fields/value. + *

+ * + */ +@InterfaceStability.Evolving +public class JsonFSLoader extends FSLoader +{ + + private static final ObjectMapper mapper = new ObjectMapper(); + private static final ObjectReader reader = mapper.reader(new TypeReference>() + { + }); + + private static final Logger logger = LoggerFactory.getLogger(JsonFSLoader.class); + + /** + * Extracts the fields from a json record and returns a map containing field + * names and values + */ + @Override + Map extractFields(String line) + { + try { + return reader.readValue(line); + } catch (IOException e) { + logger.error("Exception while extracting fields {}", e); + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java index f24a13c..56f9c7f 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java @@ -21,7 +21,9 @@ package com.datatorrent.contrib.enrich; import java.io.File; import java.io.IOException; import java.net.URL; +import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Date; import java.util.Map; import org.junit.Assert; @@ -51,7 +53,7 @@ public class FileEnrichmentTest FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath())); MapEnricher oper = new MapEnricher(); - FSLoader store = new FSLoader(); + FSLoader store = new JsonFSLoader(); store.setFileName(fileUrl.toString()); oper.setLookupFields(Arrays.asList("productId")); oper.setIncludeFields(Arrays.asList("productCategory")); @@ -99,5 +101,69 @@ public class FileEnrichmentTest Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory")); Assert.assertTrue(emitted.get("productCategory") instanceof Integer); } -} + @Test + public void testEnrichmentOperatorDelimitedFSLoader() throws IOException, InterruptedException + { + URL origUrl = this.getClass().getResource("/productmapping-delim.txt"); + + URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping-delim1.txt"); + FileUtils.deleteQuietly(new File(fileUrl.getPath())); + FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath())); + MapEnricher oper = new MapEnricher(); + DelimitedFSLoader store = new DelimitedFSLoader(); + // store.setFieldDescription("productCategory:INTEGER,productId:INTEGER"); + store.setFileName(fileUrl.toString()); + store.setSchema( + "{\"separator\":\",\",\"fields\": [{\"name\": \"productCategory\",\"type\": \"Integer\"},{\"name\": \"productId\",\"type\": \"Integer\"},{\"name\": \"mfgDate\",\"type\": \"Date\",\"constraints\": {\"format\": \"dd/MM/yyyy\"}}]}"); + oper.setLookupFields(Arrays.asList("productId")); + oper.setIncludeFields(Arrays.asList("productCategory", "mfgDate")); + oper.setStore(store); + + oper.setup(null); + + CollectorTestSink> sink = new CollectorTestSink<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) + CollectorTestSink tmp = (CollectorTestSink)sink; + oper.output.setSink(tmp); + + oper.activate(null); + + oper.beginWindow(0); + Map tuple = Maps.newHashMap(); + tuple.put("productId", 3); + tuple.put("channelId", 4); + tuple.put("amount", 10.0); + + Kryo kryo = new Kryo(); + oper.input.process(kryo.copy(tuple)); + + oper.endWindow(); + + oper.deactivate(); + + /* Number of tuple, emitted */ + Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size()); + Map emitted = sink.collectedTuples.iterator().next(); + + /* The fields present in original event is kept as it is */ + Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size()); + Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId")); + Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId")); + Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount")); + + /* Check if productCategory is added to the event */ + Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory")); + Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory")); + Assert.assertTrue(emitted.get("productCategory") instanceof Integer); + + /* Check if mfgDate is added to the event */ + Assert.assertEquals("mfgDate is part of tuple", true, emitted.containsKey("productCategory")); + Date mfgDate = (Date)emitted.get("mfgDate"); + Assert.assertEquals("value of day", 1, mfgDate.getDate()); + Assert.assertEquals("value of month", 0, mfgDate.getMonth()); + Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900); + Assert.assertTrue(emitted.get("mfgDate") instanceof Date); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/resources/productmapping-delim.txt ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/productmapping-delim.txt b/contrib/src/test/resources/productmapping-delim.txt new file mode 100755 index 0000000..1a685cb --- /dev/null +++ b/contrib/src/test/resources/productmapping-delim.txt @@ -0,0 +1,100 @@ +5,0,01/01/2016 +4,1,01/01/2016 +5,2,01/01/2016 +5,3,01/01/2016 +5,4,01/01/2016 +1,5,01/01/2016 +2,6,01/01/2016 +4,7,01/01/2016 +2,8,01/01/2016 +3,9,01/01/2016 +1,10,01/01/2016 +5,11,01/01/2016 +5,12,01/01/2016 +1,13,01/01/2016 +1,14,01/01/2016 +2,15,01/01/2016 +3,16,01/01/2016 +5,17,01/01/2016 +2,18,01/01/2016 +2,19,01/01/2016 +2,20,01/01/2016 +3,21,01/01/2016 +2,22,01/01/2016 +5,23,01/01/2016 +4,24,01/01/2016 +1,25,01/01/2016 +3,26,01/01/2016 +3,27,01/01/2016 +3,28,01/01/2016 +5,29,01/01/2016 +2,30,01/01/2016 +3,31,01/01/2016 +3,32,01/01/2016 +3,33,01/01/2016 +1,34,01/01/2016 +3,35,01/01/2016 +2,36,01/01/2016 +1,37,01/01/2016 +3,38,01/01/2016 +2,39,01/01/2016 +1,40,01/01/2016 +5,41,01/01/2016 +3,42,01/01/2016 +5,43,01/01/2016 +2,44,01/01/2016 +4,45,01/01/2016 +5,46,01/01/2016 +2,47,01/01/2016 +3,48,01/01/2016 +5,49,01/01/2016 +5,50,01/01/2016 +4,51,01/01/2016 +5,52,01/01/2016 +1,53,01/01/2016 +5,54,01/01/2016 +4,55,01/01/2016 +4,56,01/01/2016 +2,57,01/01/2016 +4,58,01/01/2016 +4,59,01/01/2016 +4,60,01/01/2016 +1,61,01/01/2016 +2,62,01/01/2016 +3,63,01/01/2016 +5,64,01/01/2016 +1,65,01/01/2016 +5,66,01/01/2016 +5,67,01/01/2016 +2,68,01/01/2016 +3,69,01/01/2016 +3,70,01/01/2016 +2,71,01/01/2016 +3,72,01/01/2016 +4,73,01/01/2016 +2,74,01/01/2016 +3,75,01/01/2016 +3,76,01/01/2016 +4,77,01/01/2016 +5,78,01/01/2016 +4,79,01/01/2016 +1,80,01/01/2016 +1,81,01/01/2016 +1,82,01/01/2016 +3,83,01/01/2016 +1,84,01/01/2016 +5,85,01/01/2016 +3,86,01/01/2016 +4,87,01/01/2016 +1,88,01/01/2016 +5,89,01/01/2016 +3,90,01/01/2016 +5,91,01/01/2016 +2,92,01/01/2016 +2,93,01/01/2016 +3,94,01/01/2016 +1,95,01/01/2016 +1,96,01/01/2016 +5,97,01/01/2016 +3,98,01/01/2016 +5,99,01/01/2016