apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2023) Adding Enrichment Operator to Malhar
Date Fri, 29 Apr 2016 17:20:13 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264366#comment-15264366
] 

ASF GitHub Bot commented on APEXMALHAR-2023:
--------------------------------------------

Github user chinmaykolhatkar commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-malhar/pull/235#discussion_r61614306
  
    --- Diff: contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---
    @@ -0,0 +1,181 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.contrib.enrich;
    +
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.codehaus.jackson.JsonProcessingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.codehaus.jackson.map.ObjectReader;
    +import org.codehaus.jackson.type.TypeReference;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +
    +import com.esotericsoftware.kryo.NotNull;
    +import com.google.common.collect.Maps;
    +import com.datatorrent.lib.util.FieldInfo;
    +
    +
    +/**
    + * This implementation of {@link BackendLoader} loads the data from given file, puts
in memory cache and serves the
    + * queries from the same cache.
    + * When this becomes part of cache manager, it can call {@link #loadInitialData()} periodically
reload the file.
    + * <p>
    + * The format of the file looks like following:
    + * <p>
    + * {"productCategory": 5, "productId": 0}
    + * {"productCategory": 4, "productId": 1}
    + * {"productCategory": 5, "productId": 2}
    + * {"productCategory": 5, "productId": 3}
    + * </p>
    + * Each line in a file should be a valid json object which represent the record and each
key/value pair in that json object
    + * represents the fields/value.
    + * <p>
    + * NOTE: This loader should be used with caution as all the data present in the file
is loaded in memory.
    + */
    +@InterfaceStability.Evolving
    +public class FSLoader extends ReadOnlyBackup
    +{
    +  @NotNull
    +  private String fileName;
    +
    +  private transient Path filePath;
    +  private transient FileSystem fs;
    +  private transient boolean connected;
    +
    +  private static final ObjectMapper mapper = new ObjectMapper();
    +  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String,
Object>>(){});
    +  private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
    +
    +  public String getFileName()
    +  {
    +    return fileName;
    +  }
    +
    +  public void setFileName(String fileName)
    +  {
    +    this.fileName = fileName;
    +  }
    +
    +  @Override
    +  public Map<Object, Object> loadInitialData()
    +  {
    +    Map<Object, Object> result = null;
    +    FSDataInputStream in = null;
    +    BufferedReader bin = null;
    +    try {
    +      result = Maps.newHashMap();
    +      in = fs.open(filePath);
    +      bin = new BufferedReader(new InputStreamReader(in));
    +      String line;
    +      while ((line = bin.readLine()) != null) {
    +        try {
    +          Map<String, Object> tuple = reader.readValue(line);
    +          result.put(getKey(tuple), getValue(tuple));
    +        } catch (JsonProcessingException parseExp) {
    +          logger.info("Unable to parse line {}", line);
    +        }
    +      }
    +    } catch (IOException e) {
    +      throw new RuntimeException(e);
    +    } finally {
    +      if (bin != null) {
    +        IOUtils.closeQuietly(bin);
    +      }
    +      if (in != null) {
    +        IOUtils.closeQuietly(in);
    +      }
    +      try {
    +        fs.close();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      }
    +    }
    +    logger.debug("loading initial data {}", result.size());
    +    return result;
    +  }
    +
    +  private Object getValue(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> includeTuple = new ArrayList<Object>();
    +    for (FieldInfo s : includeFieldInfo) {
    +      includeTuple.add(tuple.get(s.getColumnName()));
    +    }
    +    return includeTuple;
    +  }
    +
    +  private Object getKey(Map<String, Object> tuple)
    +  {
    +    ArrayList<Object> lst = new ArrayList<Object>();
    +    for (FieldInfo key : lookupFieldInfo) {
    +      lst.add(tuple.get(key.getColumnName()));
    +    }
    +    return lst;
    +  }
    +
    +  @Override
    +  public Object get(Object key)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public List<Object> getAll(List<Object> keys)
    +  {
    +    return null;
    +  }
    +
    +  @Override
    +  public void connect() throws IOException
    +  {
    +    Configuration conf = new Configuration();
    +    this.filePath = new Path(fileName);
    +    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
    --- End diff --
    
    Considering the size of the file, I think it might be just more efficient to reload the
file.
    This FSLoader is to be used when size of the file is small even to be loaded in memory.



> Adding Enrichment Operator to Malhar
> ------------------------------------
>
>                 Key: APEXMALHAR-2023
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2023
>             Project: Apache Apex Malhar
>          Issue Type: New Feature
>          Components: adapters database
>    Affects Versions: 3.3.1
>            Reporter: Chinmay Kolhatkar
>            Assignee: Chinmay Kolhatkar
>
> Add Enrichment Operator to Apex Malhar.
> Discussion is happening in mailing list here:
> http://mail-archives.apache.org/mod_mbox/incubator-apex-dev/201603.mbox/%3CCAKJfLDMo24-Gcvum2ZL8-0JOnE8QLryAy0Zu_R5zhMd_bsJyHw%40mail.gmail.com%3E
> Ponymail permalink:
> https://pony-poc.apache.org/thread.html/Z8t5ut5pu5vprgt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message