apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sand...@apache.org
Subject [1/4] incubator-apex-malhar git commit: APEXMALHAR-2023 Adding Enrichment Operator to Malhar
Date Tue, 10 May 2016 09:05:55 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 3e09d7b25 -> 79c643b95


APEXMALHAR-2023 Adding Enrichment Operator to Malhar

Developed Map and Bean Enrichment Operators
Developed JDBC Loader
Developed HBase Loader


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/528e7aca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/528e7aca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/528e7aca

Branch: refs/heads/master
Commit: 528e7aca93ef32ce60be630d9d0f74ce3611059f
Parents: 08cb9d3
Author: Chaitanya <chaitanya@datatorrent.com>
Authored: Tue Mar 29 14:46:17 2016 +0530
Committer: chinmaykolhatkar <chinmay@datatorrent.com>
Committed: Fri Apr 29 22:12:47 2016 +0530

----------------------------------------------------------------------
 .../enrichment/AbstractEnrichmentOperator.java  | 170 +++++++++++++++++
 .../contrib/enrichment/EnrichmentBackup.java    |  18 ++
 .../contrib/enrichment/FSLoader.java            | 146 +++++++++++++++
 .../contrib/enrichment/HBaseLoader.java         | 128 +++++++++++++
 .../contrib/enrichment/JDBCLoader.java          | 158 ++++++++++++++++
 .../enrichment/MapEnrichmentOperator.java       |  57 ++++++
 .../enrichment/NullValuesCacheManager.java      |  43 +++++
 .../enrichment/POJOEnrichmentOperator.java      | 185 +++++++++++++++++++
 .../contrib/enrichment/ReadOnlyBackup.java      |  38 ++++
 .../contrib/enrichment/package-info.java        |   1 +
 .../enrichment/BeanEnrichmentOperatorTest.java  |  95 ++++++++++
 .../contrib/enrichment/EmployeeOrder.java       |  95 ++++++++++
 .../contrib/enrichment/FileEnrichmentTest.java  |  75 ++++++++
 .../contrib/enrichment/HBaseLoaderTest.java     | 162 ++++++++++++++++
 .../contrib/enrichment/JDBCLoaderTest.java      | 179 ++++++++++++++++++
 .../enrichment/MapEnrichmentOperatorTest.java   | 152 +++++++++++++++
 16 files changed, 1702 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
new file mode 100644
index 0000000..cbd4d5e
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/AbstractEnrichmentOperator.java
@@ -0,0 +1,170 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.lib.db.cache.CacheManager;
+import com.datatorrent.lib.db.cache.CacheStore;
+import com.datatorrent.lib.db.cache.CacheStore.ExpiryType;
+import com.esotericsoftware.kryo.NotNull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Base class for Enrichment Operator.&nbsp; Subclasses should provide implementation to getKey and convert.
+ * The operator receives a tuple and emits enriched tuple based on includeFields and lookupFields. <br/>
+ *
+ * Properties:<br>
+ * <b>lookupFieldsStr</b>: List of comma separated keys for quick searching. Ex: Field1,Field2,Field3<br>
+ * <b>includeFieldsStr</b>: List of comma separated fields to be replaced/added to the input tuple. Ex: Field1,Field2,Field3<br>
+ * <b>store</b>: Specify the type of loader for looking data<br>
+ * <br>
+ *
+ *
+ * @displayName Abstract Enrichment Operator
+ * @tags Enrichment
+ * @param <INPUT> Type of tuples which are received by this operator</T>
+ * @param <OUTPUT> Type of tuples which are emitted by this operator</T>
+ * @since 2.1.0
+ */
+public abstract class AbstractEnrichmentOperator<INPUT, OUTPUT> extends BaseOperator
+{
+  /**
+   * Keep lookup data cache for fast access.
+   */
+  private transient CacheManager cacheManager;
+
+  private transient CacheStore primaryCache = new CacheStore();
+
+  private int entryExpiryDurationInMillis = 24 * 60 * 60 * 1000;
+  private int cacheCleanupInMillis = 24 * 60 * 60 * 1000;
+  private int cacheSize = 1024;
+
+  public transient DefaultOutputPort<OUTPUT> output = new DefaultOutputPort<OUTPUT>();
+
+  @InputPortFieldAnnotation(optional = true)
+  public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>()
+  {
+    @Override public void process(INPUT tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  private EnrichmentBackup store;
+
+  @NotNull
+  protected String lookupFieldsStr;
+
+  protected String includeFieldsStr;
+
+  protected transient List<String> lookupFields = new ArrayList<String>();
+  protected transient List<String> includeFields = new ArrayList<String>();
+
+  protected void processTuple(INPUT tuple) {
+    Object key = getKey(tuple);
+    if(key != null) {
+      Object result = cacheManager.get(key);
+      OUTPUT out = convert(tuple, result);
+      emitTuple(out);
+    }
+  }
+
+  protected abstract Object getKey(INPUT tuple);
+
+  protected void emitTuple(OUTPUT tuple) {
+    output.emit(tuple);
+  }
+
+  /* Add data from cached value to input field */
+  protected abstract OUTPUT convert(INPUT in, Object cached);
+
+  @Override public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    cacheManager = new NullValuesCacheManager();
+
+    // set expiration to one day.
+    primaryCache.setEntryExpiryDurationInMillis(entryExpiryDurationInMillis);
+    primaryCache.setCacheCleanupInMillis(cacheCleanupInMillis);
+    primaryCache.setEntryExpiryStrategy(ExpiryType.EXPIRE_AFTER_WRITE);
+    primaryCache.setMaxCacheSize(cacheSize);
+
+    lookupFields = Arrays.asList(lookupFieldsStr.split(","));
+    if (includeFieldsStr != null) {
+      includeFields = Arrays.asList(includeFieldsStr.split(","));
+    }
+
+    try {
+      store.setFields(lookupFields, includeFields);
+
+      cacheManager.setPrimary(primaryCache);
+      cacheManager.setBackup(store);
+      cacheManager.initialize();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to initialize primary cache", e);
+    }
+  }
+
+  /**
+   * Set the type of backup store for storing and searching data.
+   */
+  public void setStore(EnrichmentBackup store) {
+    this.store = store;
+  }
+
+  public EnrichmentBackup getStore() {
+    return store;
+  }
+
+  public CacheStore getPrimaryCache()
+  {
+    return primaryCache;
+  }
+
+  public String getLookupFieldsStr()
+  {
+    return lookupFieldsStr;
+  }
+
+  /**
+   * Set the lookup fields for quick searching. It would be in comma separated list
+   */
+  public void setLookupFieldsStr(String lookupFieldsStr)
+  {
+    this.lookupFieldsStr = lookupFieldsStr;
+  }
+
+  public String getIncludeFieldsStr()
+  {
+    return includeFieldsStr;
+  }
+
+  /**
+   * Set the list of comma separated fields to be added/replaced to the incoming tuple.
+   */
+  public void setIncludeFieldsStr(String includeFieldsStr)
+  {
+    this.includeFieldsStr = includeFieldsStr;
+  }
+
+  public void setEntryExpiryDurationInMillis(int entryExpiryDurationInMillis)
+  {
+    this.entryExpiryDurationInMillis = entryExpiryDurationInMillis;
+  }
+
+  public void setCacheCleanupInMillis(int cacheCleanupInMillis)
+  {
+    this.cacheCleanupInMillis = cacheCleanupInMillis;
+  }
+
+  public void setCacheSize(int cacheSize)
+  {
+    this.cacheSize = cacheSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
new file mode 100644
index 0000000..9155b13
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/EnrichmentBackup.java
@@ -0,0 +1,18 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+import java.util.List;
+/**
+ * @since 3.1.0
+ */
+
+public interface EnrichmentBackup extends CacheManager.Backup
+{
+  public void setFields(List<String> lookupFields,List<String> includeFields);
+  public boolean needRefresh();
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
new file mode 100644
index 0000000..2effed0
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/FSLoader.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+package com.datatorrent.contrib.enrichment;
+
+import com.esotericsoftware.kryo.NotNull;
+import com.google.common.collect.Maps;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * @since 3.1.0
+ */
+
+public class FSLoader extends ReadOnlyBackup
+{
+  @NotNull
+  private String fileName;
+
+  private transient Path filePath;
+  private transient FileSystem fs;
+  private transient boolean connected;
+
+  private transient static final ObjectMapper mapper = new ObjectMapper();
+  private transient static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>()
+  {
+  });
+  private transient static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
+
+  public String getFileName()
+  {
+    return fileName;
+  }
+
+  public void setFileName(String fileName)
+  {
+    this.fileName = fileName;
+  }
+
+  @Override public Map<Object, Object> loadInitialData()
+  {
+    Map<Object, Object> result = null;
+    FSDataInputStream in = null;
+    BufferedReader bin = null;
+    try {
+      result = Maps.newHashMap();
+      in = fs.open(filePath);
+      bin = new BufferedReader(new InputStreamReader(in));
+      String line;
+      while ((line = bin.readLine()) != null) {
+        try {
+          Map<String, Object> tuple = reader.readValue(line);
+          if(CollectionUtils.isEmpty(includeFields)) {
+            if(includeFields == null)
+              includeFields = new ArrayList<String>();
+            for (Map.Entry<String, Object> e : tuple.entrySet()) {
+              includeFields.add(e.getKey());
+            }
+          }
+          ArrayList<Object> includeTuple = new ArrayList<Object>();
+          for(String s: includeFields) {
+            includeTuple.add(tuple.get(s));
+          }
+          result.put(getKey(tuple), includeTuple);
+        } catch (JsonProcessingException parseExp) {
+          logger.info("Unable to parse line {}", line);
+        }
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } finally {
+      if(bin != null)
+        IOUtils.closeQuietly(bin);
+      if(in != null)
+        IOUtils.closeQuietly(in);
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    logger.debug("loading initial data {}", result.size());
+    return result;
+  }
+
+  private Object getKey(Map<String, Object> tuple)
+  {
+    ArrayList<Object> lst = new ArrayList<Object>();
+    for(String key : lookupFields) {
+      lst.add(tuple.get(key));
+    }
+    return lst;
+  }
+
+  @Override public Object get(Object key)
+  {
+    return null;
+  }
+
+  @Override public List<Object> getAll(List<Object> keys)
+  {
+    return null;
+  }
+
+  @Override public void connect() throws IOException
+  {
+    Configuration conf = new Configuration();
+    this.filePath = new Path(fileName);
+    this.fs = FileSystem.newInstance(filePath.toUri(), conf);
+    if (!fs.isFile(filePath))
+      throw new IOException("Provided path " + fileName + " is not a file");
+    connected = true;
+  }
+
+  @Override public void disconnect() throws IOException
+  {
+    if (fs != null)
+      fs.close();
+  }
+
+  @Override public boolean isConnected()
+  {
+    return connected;
+  }
+
+  @Override
+  public boolean needRefresh() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
new file mode 100755
index 0000000..7040a7a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/HBaseLoader.java
@@ -0,0 +1,128 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.contrib.hbase.HBaseStore;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * <p>HBaseLoader extends from {@link HBaseStore} uses HBase to connect and implements EnrichmentBackup interface.</p> <br/>
+ *
+ * Properties:<br>
+ * <b>includeFamilys</b>: List of comma separated families and each family corresponds to the group name of column fields in includeFieldsStr. Ex: Family1,Family2<br>
+ * <br>
+ *
+ * @displayName HBaseLoader
+ * @tags Loader
+ * @since 2.1.0
+ */
+public class HBaseLoader extends HBaseStore implements EnrichmentBackup
+{
+  protected transient List<String> includeFields;
+  protected transient List<String> lookupFields;
+  protected transient List<String> includeFamilys;
+
+  protected Object getQueryResult(Object key)
+  {
+    try {
+      Get get = new Get(getRowBytes(((ArrayList)key).get(0)));
+      int idx = 0;
+      for(String f : includeFields) {
+        get.addColumn(Bytes.toBytes(includeFamilys.get(idx++)), Bytes.toBytes(f));
+      }
+      return getTable().get(get);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected ArrayList<Object> getDataFrmResult(Object result)
+  {
+    Result res = (Result)result;
+    if (res == null || res.isEmpty())
+      return null;
+    ArrayList<Object> columnInfo = new ArrayList<Object>();
+
+    if(CollectionUtils.isEmpty(includeFields)) {
+      if(includeFields == null) {
+        includeFields = new ArrayList<String>();
+        includeFamilys.clear();
+        includeFamilys = new ArrayList<String>();
+      }
+      for (KeyValue kv: res.raw()) {
+        includeFields.add(new String(kv.getQualifier()));
+        includeFamilys.add(new String(kv.getFamily()));
+      }
+    }
+    for(KeyValue kv : res.raw()) {
+      columnInfo.add(kv.getValue());
+    }
+    return columnInfo;
+  }
+
+  private byte[] getRowBytes(Object key)
+  {
+    return ((String)key).getBytes();
+  }
+
+  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
+  {
+    this.includeFields = includeFields;
+    this.lookupFields = lookupFields;
+  }
+
+  /**
+   * Set the familyStr and would be in the form of comma separated list.
+   */
+  public void setIncludeFamilyStr(String familyStr)
+  {
+    this.includeFamilys = Arrays.asList(familyStr.split(","));
+  }
+
+  @Override public boolean needRefresh()
+  {
+    return false;
+  }
+
+  @Override public Map<Object, Object> loadInitialData()
+  {
+    return null;
+  }
+
+  @Override public Object get(Object key)
+  {
+    return getDataFrmResult(getQueryResult(key));
+  }
+
+  @Override public List<Object> getAll(List<Object> keys)
+  {
+    List<Object> values = Lists.newArrayList();
+    for (Object key : keys) {
+      values.add(get(key));
+    }
+    return values;
+  }
+
+  @Override public void put(Object key, Object value)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void putAll(Map<Object, Object> m)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void remove(Object key)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
new file mode 100644
index 0000000..3b1a8cf
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/JDBCLoader.java
@@ -0,0 +1,158 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.db.jdbc.JdbcStore;
+import com.google.common.collect.Lists;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+
+/**
+ * <p>HBaseLoader extends from {@link JdbcStore} uses JDBC to connect and implements EnrichmentBackup interface.</p> <br/>
+ *
+ * Properties:<br>
+ * <b>queryStmt</b>: Sql Prepared Statement which needs to be executed<br>
+ * <b>tableName</b>: JDBC table name<br>
+ * <br>
+ *
+ * @displayName JDBCLoader
+ * @tags Loader
+ * @since 2.1.0
+ */
+public class JDBCLoader extends JdbcStore implements EnrichmentBackup
+{
+  protected String queryStmt;
+
+  protected String tableName;
+
+  protected transient List<String> includeFields;
+  protected transient List<String> lookupFields;
+
+  protected Object getQueryResult(Object key)
+  {
+    try {
+      PreparedStatement getStatement = getConnection().prepareStatement(queryStmt);
+      ArrayList<Object> keys = (ArrayList<Object>) key;
+      for (int i = 0; i < keys.size(); i++) {
+        getStatement.setObject(i+1, keys.get(i));
+      }
+      return getStatement.executeQuery();
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected ArrayList<Object> getDataFrmResult(Object result) throws RuntimeException
+  {
+    try {
+      ResultSet resultSet = (ResultSet) result;
+      if (resultSet.next()) {
+        ResultSetMetaData rsdata = resultSet.getMetaData();
+        // If the includefields is empty, populate it from ResultSetMetaData
+        if(CollectionUtils.isEmpty(includeFields)) {
+          if(includeFields == null)
+            includeFields = new ArrayList<String>();
+          for (int i = 1; i <= rsdata.getColumnCount(); i++) {
+            includeFields.add(rsdata.getColumnName(i));
+          }
+        }
+        ArrayList<Object> res = new ArrayList<Object>();
+        for(String f : includeFields) {
+          res.add(resultSet.getObject(f));
+        }
+        return res;
+      } else
+        return null;
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String generateQueryStmt()
+  {
+    String stmt = "select * from " + tableName + " where ";
+    for (int i = 0; i < lookupFields.size(); i++) {
+      stmt = stmt + lookupFields.get(i) + " = ? ";
+      if(i != lookupFields.size() - 1) {
+        stmt = stmt + " and ";
+      }
+    }
+    logger.info("generateQueryStmt: {}", stmt);
+    return stmt;
+  }
+
+  public String getQueryStmt()
+  {
+    return queryStmt;
+  }
+
+  @Override
+  public boolean needRefresh() {
+    return false;
+  }
+
+  /**
+   * Set the sql Prepared Statement if the enrichment mechanism is query based.
+   */
+  public void setQueryStmt(String queryStmt)
+  {
+    this.queryStmt = queryStmt;
+  }
+
+  public String getTableName()
+  {
+    return tableName;
+  }
+  /**
+   * Set the table name.
+   */
+  public void setTableName(String tableName)
+  {
+    this.tableName = tableName;
+  }
+
+  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
+  {
+    this.includeFields = includeFields;
+    this.lookupFields = lookupFields;
+    if(queryStmt == null)
+      queryStmt = generateQueryStmt();
+  }
+  @Override public Map<Object, Object> loadInitialData()
+  {
+    return null;
+  }
+
+  @Override public Object get(Object key)
+  {
+    return getDataFrmResult(getQueryResult(key));
+  }
+
+  @Override public List<Object> getAll(List<Object> keys)
+  {
+    List<Object> values = Lists.newArrayList();
+    for (Object key : keys) {
+      values.add(get(key));
+    }
+    return values;
+  }
+
+  @Override public void put(Object key, Object value)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void putAll(Map<Object, Object> m)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void remove(Object key)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
new file mode 100644
index 0000000..040b5ae
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperator.java
@@ -0,0 +1,57 @@
+package com.datatorrent.contrib.enrichment;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ *
+ * This class takes a HashMap tuple as input and extract the value of the lookupKey configured
+ * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
+ * specified in the file/DB or based on include fields are added to original tuple.
+ *
+ * Example
+ * The file contains data in json format, one entry per line. during setup entire file is read and
+ * kept in memory for quick lookup.
+ * If file contains following lines, and operator is configured with lookup key "productId"
+ * { "productId": 1, "productCategory": 3 }
+ * { "productId": 4, "productCategory": 10 }
+ * { "productId": 3, "productCategory": 1 }
+ *
+ * And input tuple is
+ * { amount=10.0, channelId=4, productId=3 }
+ *
+ * The tuple is modified as below before operator emits it on output port.
+ * { amount=10.0, channelId=4, productId=3, productCategory=1 }
+ *
+ *
+ * @displayName MapEnrichment
+ * @category Database
+ * @tags enrichment, lookup
+ *
+ * @since 2.1.0
+ */
+public class MapEnrichmentOperator extends AbstractEnrichmentOperator<Map<String, Object>, Map<String, Object>>
+{
+  @Override protected Object getKey(Map<String, Object> tuple)
+  {
+    ArrayList<Object> keyList = new ArrayList<Object>();
+    for(String key : lookupFields) {
+      keyList.add(tuple.get(key));
+    }
+    return keyList;
+  }
+
+  @Override protected Map<String, Object> convert(Map<String, Object> in, Object cached)
+  {
+    if (cached == null)
+      return in;
+
+    ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
+    if(newAttributes != null) {
+      for (int i = 0; i < includeFields.size(); i++) {
+        in.put(includeFields.get(i), newAttributes.get(i));
+      }
+    }
+    return in;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
new file mode 100644
index 0000000..f668683
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/NullValuesCacheManager.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.db.cache.CacheManager;
+
+/**
+ * @since 3.1.0
+ */
+public class NullValuesCacheManager extends CacheManager
+{
+
+  private static final NullObject NULL = new NullObject();
+  @Override
+  public Object get(Object key)
+  {
+    Object primaryVal = primary.get(key);
+    if (primaryVal != null) {
+      if (primaryVal == NULL) {
+        return null;
+      }
+
+      return primaryVal;
+    }
+
+    Object backupVal = backup.get(key);
+    if (backupVal != null) {
+      primary.put(key, backupVal);
+    } else {
+      primary.put(key, NULL);
+    }
+    return backupVal;
+
+  }
+
+  private static class NullObject
+  {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
new file mode 100644
index 0000000..e707198
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/POJOEnrichmentOperator.java
@@ -0,0 +1,185 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.esotericsoftware.kryo.NotNull;
+import org.apache.commons.lang3.ClassUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * This class takes a POJO as input and extract the value of the lookupKey configured
+ * for this operator. It then does a lookup in file/DB to find matching entry and all key-value pairs
+ * specified in the file/DB or based on include fieldMap are added to original tuple.
+ *
+ * Properties:<br>
+ * <b>inputClass</b>: Class to be loaded for the incoming data type<br>
+ * <b>outputClass</b>: Class to be loaded for the emitted data type<br>
+ * <br>
+ *
+ * Example
+ * The file contains data in json format, one entry per line. during setup entire file is read and
+ * kept in memory for quick lookup.
+ * If file contains following lines, and operator is configured with lookup key "productId"
+ * { "productId": 1, "productCategory": 3 }
+ * { "productId": 4, "productCategory": 10 }
+ * { "productId": 3, "productCategory": 1 }
+ *
+ * And input tuple is
+ * { amount=10.0, channelId=4, productId=3 }
+ *
+ * The tuple is modified as below before operator emits it on output port.
+ * { amount=10.0, channelId=4, productId=3, productCategory=1 }
+ *
+ * @displayName BeanEnrichment
+ * @category Database
+ * @tags enrichment, lookup
+ *
+ * @since 2.1.0
+ */
+public class POJOEnrichmentOperator extends AbstractEnrichmentOperator<Object, Object> {
+
+  private transient static final Logger logger = LoggerFactory.getLogger(POJOEnrichmentOperator.class);
+  protected Class inputClass;
+  protected Class outputClass;
+  private transient List<Getter> getters = new LinkedList<Getter>();
+  private transient List<FieldObjectMap> fieldMap = new LinkedList<FieldObjectMap>();
+  private transient List<Setter> updateSetter = new LinkedList<Setter>();
+
+  @NotNull
+  protected String outputClassStr;
+
+
+  @Override
+  protected Object getKey(Object tuple) {
+    ArrayList<Object> keyList = new ArrayList<Object>();
+    for(Getter g : getters) {
+        keyList.add(g.get(tuple));
+    }
+    return keyList;
+  }
+
+  @Override
+  protected Object convert(Object in, Object cached) {
+    try {
+      Object o = outputClass.newInstance();
+
+      // Copy the fields from input to output
+      for (FieldObjectMap map : fieldMap) {
+        map.set.set(o, map.get.get(in));
+      }
+
+      if (cached == null)
+        return o;
+
+      if(updateSetter.size() == 0 && includeFields.size() != 0) {
+        populateUpdatesFrmIncludeFields();
+      }
+      ArrayList<Object> newAttributes = (ArrayList<Object>)cached;
+      int idx = 0;
+      for(Setter s: updateSetter) {
+        s.set(o, newAttributes.get(idx));
+        idx++;
+      }
+      return o;
+    } catch (InstantiationException e) {
+      throw new RuntimeException(e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context) {
+    super.setup(context);
+    populateUpdatesFrmIncludeFields();
+  }
+
+  private void populateGettersFrmLookup()
+  {
+    for (String fName : lookupFields) {
+        Getter f = PojoUtils.createGetter(inputClass, fName, Object.class);
+        getters.add(f);
+    }
+  }
+
+  private void populateGettersFrmInput()
+  {
+    Field[] fields = inputClass.getFields();
+    for (Field f : fields) {
+      Class c = ClassUtils.primitiveToWrapper(f.getType());
+      FieldObjectMap fieldMap = new FieldObjectMap();
+      fieldMap.get = PojoUtils.createGetter(inputClass, f.getName(), c);
+      try {
+        fieldMap.set = PojoUtils.createSetter(outputClass, f.getName(), c);
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
+      }
+      this.fieldMap.add(fieldMap);
+    }
+  }
+
+  private void populateUpdatesFrmIncludeFields() {
+    if (this.outputClass == null) {
+      logger.debug("Creating output class instance from string: {}", outputClassStr);
+      try {
+        this.outputClass = this.getClass().getClassLoader().loadClass(outputClassStr);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    for (String fName : includeFields) {
+      try {
+        Field f = outputClass.getField(fName);
+        Class c;
+        if(f.getType().isPrimitive()) {
+          c = ClassUtils.primitiveToWrapper(f.getType());
+        } else {
+           c = f.getType();
+        }
+        try {
+          updateSetter.add(PojoUtils.createSetter(outputClass, f.getName(), c));
+        } catch (Throwable e) {
+          throw new RuntimeException("Failed to initialize Output Class for field: " + f.getName(), e);
+        }
+      } catch (NoSuchFieldException e) {
+        throw new RuntimeException("Cannot find field '" + fName + "' in output class", e);
+      }
+    }
+  }
+
+  public String getOutputClassStr()
+  {
+    return outputClassStr;
+  }
+
+  public void setOutputClassStr(String outputClassStr)
+  {
+    this.outputClassStr = outputClassStr;
+  }
+
+  @Override protected void processTuple(Object tuple)
+  {
+    if (inputClass == null) {
+      inputClass = tuple.getClass();
+      populateGettersFrmLookup();
+      populateGettersFrmInput();
+    }
+    super.processTuple(tuple);
+  }
+
+  private class FieldObjectMap
+  {
+    public Getter get;
+    public Setter set;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
new file mode 100644
index 0000000..3357704
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/ReadOnlyBackup.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (c) 2015 DataTorrent, Inc.
+ * All rights reserved.
+ */
+package com.datatorrent.contrib.enrichment;
+
+import java.util.List;
+import java.util.Map;
+/**
+ * @since 3.1.0
+ */
+
+public abstract class ReadOnlyBackup implements EnrichmentBackup
+{
+  protected transient List<String> includeFields;
+  protected transient List<String> lookupFields;
+
+  @Override public void put(Object key, Object value)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void putAll(Map<Object, Object> m)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void remove(Object key)
+  {
+    throw new RuntimeException("Not supported operation");
+  }
+
+  @Override public void setFields(List<String> lookupFields,List<String> includeFields)
+  {
+    this.includeFields = includeFields;
+    this.lookupFields = lookupFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java b/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
new file mode 100644
index 0000000..7d5b4cd
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrichment/package-info.java
@@ -0,0 +1 @@
+package com.datatorrent.contrib.enrichment;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
new file mode 100644
index 0000000..308aa82
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/BeanEnrichmentOperatorTest.java
@@ -0,0 +1,95 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BeanEnrichmentOperatorTest extends JDBCLoaderTest
+{
+  public class Order {
+    public int OID;
+    public int ID;
+    public double amount;
+
+    public Order(int oid, int id, double amount) {
+      this.OID = oid;
+      this.ID = id;
+      this.amount = amount;
+    }
+    public int getOID()
+    {
+      return OID;
+    }
+
+    public void setOID(int OID)
+    {
+      this.OID = OID;
+    }
+
+    public int getID()
+    {
+      return ID;
+    }
+
+    public void setID(int ID)
+    {
+      this.ID = ID;
+    }
+
+    public double getAmount()
+    {
+      return amount;
+    }
+
+    public void setAmount(double amount)
+    {
+      this.amount = amount;
+    }
+  }
+
+
+  @Test
+  public void includeSelectedKeys()
+  {
+    POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
+    oper.setStore(testMeta.dbloader);
+    oper.setLookupFieldsStr("ID");
+    oper.setIncludeFieldsStr("NAME,AGE,ADDRESS");
+    oper.outputClass = EmployeeOrder.class;
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    oper.beginWindow(1);
+    Order tuple = new Order(3, 4, 700);
+    oper.input.process(tuple);
+    oper.endWindow();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", sink.collectedTuples.get(0).toString());
+  }
+  @Test
+  public void includeAllKeys()
+  {
+    POJOEnrichmentOperator oper = new POJOEnrichmentOperator();
+    oper.setStore(testMeta.dbloader);
+    oper.setLookupFieldsStr("ID");
+    oper.outputClass = EmployeeOrder.class;
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+
+    oper.beginWindow(1);
+    Order tuple = new Order(3, 4, 700);
+    oper.input.process(tuple);
+    oper.endWindow();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", sink.collectedTuples.get(0).toString());
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
new file mode 100644
index 0000000..00c9d82
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/EmployeeOrder.java
@@ -0,0 +1,95 @@
+package com.datatorrent.contrib.enrichment;
+
+// This class is needed for Bean Enrichment Operator testing
+public class EmployeeOrder {
+  public int OID;
+  public int ID;
+  public double amount;
+  public String NAME;
+  public int AGE;
+  public String ADDRESS;
+  public double SALARY;
+
+  public int getOID()
+  {
+    return OID;
+  }
+
+  public void setOID(int OID)
+  {
+    this.OID = OID;
+  }
+
+  public int getID()
+  {
+    return ID;
+  }
+
+  public void setID(int ID)
+  {
+    this.ID = ID;
+  }
+
+  public int getAGE()
+  {
+    return AGE;
+  }
+
+  public void setAGE(int AGE)
+  {
+    this.AGE = AGE;
+  }
+
+  public String getNAME()
+  {
+    return NAME;
+  }
+
+  public void setNAME(String NAME)
+  {
+    this.NAME = NAME;
+  }
+
+  public double getAmount()
+  {
+    return amount;
+  }
+
+  public void setAmount(double amount)
+  {
+    this.amount = amount;
+  }
+
+  public String getADDRESS()
+  {
+    return ADDRESS;
+  }
+
+  public void setADDRESS(String ADDRESS)
+  {
+    this.ADDRESS = ADDRESS;
+  }
+
+  public double getSALARY()
+  {
+    return SALARY;
+  }
+
+  public void setSALARY(double SALARY)
+  {
+    this.SALARY = SALARY;
+  }
+
+  @Override public String toString()
+  {
+    return "{" +
+        "OID=" + OID +
+        ", ID=" + ID +
+        ", amount=" + amount +
+        ", NAME='" + NAME + '\'' +
+        ", AGE=" + AGE +
+        ", ADDRESS='" + ADDRESS.trim() + '\'' +
+        ", SALARY=" + SALARY +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
new file mode 100644
index 0000000..934d73b
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/FileEnrichmentTest.java
@@ -0,0 +1,75 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+
+
+public class FileEnrichmentTest
+{
+
+  @Rule public final TestUtils.TestInfo testInfo = new TestUtils.TestInfo();
+
+  @Test public void testEnrichmentOperator() throws IOException, InterruptedException
+  {
+    URL origUrl = this.getClass().getResource("/productmapping.txt");
+
+    URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping1.txt");
+    FileUtils.deleteQuietly(new File(fileUrl.getPath()));
+    FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
+
+    MapEnrichmentOperator oper = new MapEnrichmentOperator();
+    FSLoader store = new FSLoader();
+    store.setFileName(fileUrl.toString());
+    oper.setLookupFieldsStr("productId");
+    oper.setStore(store);
+
+    oper.setup(null);
+
+    /* File contains 6 entries, but operator one entry is duplicate,
+     * so cache should contains only 5 entries after scanning input file.
+     */
+    //Assert.assertEquals("Number of mappings ", 7, oper.cache.size());
+
+    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<Map<String, Object>>();
+    @SuppressWarnings({ "unchecked", "rawtypes" }) CollectorTestSink<Object> tmp = (CollectorTestSink) sink;
+    oper.output.setSink(tmp);
+
+    oper.beginWindow(0);
+    Map<String, Object> tuple = Maps.newHashMap();
+    tuple.put("productId", 3);
+    tuple.put("channelId", 4);
+    tuple.put("amount", 10.0);
+
+    Kryo kryo = new Kryo();
+    oper.input.process(kryo.copy(tuple));
+
+    oper.endWindow();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+    /* The fields present in original event is kept as it is */
+    Assert.assertEquals("Number of fields in emitted tuple", 4, emitted.size());
+    Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
+    Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
+    Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
+
+    /* Check if productCategory is added to the event */
+    Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
+    Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
new file mode 100644
index 0000000..07be982
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/HBaseLoaderTest.java
@@ -0,0 +1,162 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.netlet.util.DTThrowable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.LoggerFactory;
+
+public class HBaseLoaderTest
+{
+  static final org.slf4j.Logger logger = LoggerFactory.getLogger(HBaseLoaderTest.class);
+
+  public static class TestMeta extends TestWatcher
+  {
+
+    HBaseLoader dbloader;
+    @Override
+    protected void starting(Description description)
+    {
+      try {
+        dbloader = new HBaseLoader();
+        Configuration conf = HBaseConfiguration.create();
+        conf.addResource(new Path("file:///home/chaitanya/hbase-site.xml"));
+
+        dbloader.setConfiguration(conf);
+        dbloader.setZookeeperQuorum("localhost");
+        dbloader.setZookeeperClientPort(2181);
+
+        dbloader.setTableName("EMPLOYEE");
+
+        dbloader.connect();
+        createTable();
+        insertRecordsInTable();
+      }
+      catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+    private void createTable()
+    {
+      try {
+        String[] familys = { "personal", "professional" };
+        HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
+        HTableDescriptor tableDesc = new HTableDescriptor(dbloader.getTableName());
+        for (int i = 0; i < familys.length; i++) {
+          tableDesc.addFamily(new HColumnDescriptor(familys[i]));
+        }
+        admin.createTable(tableDesc);
+
+        logger.debug("Table  created successfully...");
+      }
+      catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+
+    @SuppressWarnings("deprecation")
+    public void addRecord(String rowKey, String family, String qualifier, String value) throws Exception {
+      try {
+        HTable table = new HTable(dbloader.getConfiguration(), dbloader.getTableName());
+        Put put = new Put(Bytes.toBytes(rowKey));
+        put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
+            .toBytes(value));
+        table.put(put);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
+    private void insertRecordsInTable()
+    {
+      try {
+        addRecord("row1", "personal", "name", "raju");
+        addRecord("row1", "personal", "city", "hyderabad");
+        addRecord("row1", "professional", "designation", "manager");
+        addRecord("row1", "professional", "Salary", "50000");
+
+        addRecord("row2", "personal", "name", "ravi");
+        addRecord("row2", "personal", "city", "Chennai");
+        addRecord("row2", "professional", "designation", "SE");
+        addRecord("row2", "professional", "Salary", "30000");
+
+        addRecord("row3", "personal", "name", "rajesh");
+        addRecord("row3", "personal", "city", "Delhi");
+        addRecord("row3", "professional", "designation", "E");
+        addRecord("row3", "professional", "Salary", "10000");
+      }
+      catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+
+    }
+
+    private void cleanTable()
+    {
+      String sql = "delete from  " + dbloader.getTableName();
+      try {
+        HBaseAdmin admin = new HBaseAdmin(dbloader.getConfiguration());
+        admin.disableTable(dbloader.getTableName());
+        admin.deleteTable(dbloader.getTableName());
+      } catch (MasterNotRunningException e) {
+        e.printStackTrace();
+      } catch (ZooKeeperConnectionException e) {
+        e.printStackTrace();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      cleanTable();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testHBaseLookup() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ArrayList<String> includeKeys = new ArrayList<String>();
+    includeKeys.add("city");
+    includeKeys.add("Salary");
+    ArrayList<String> lookupKeys = new ArrayList<String>();
+    lookupKeys.add("ID");
+    testMeta.dbloader.setFields(lookupKeys, includeKeys);
+
+    String includeFamilyStr = "personal, professional";
+    testMeta.dbloader.setIncludeFamilyStr(includeFamilyStr);
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add("row2");
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("CITY", "Chennai", columnInfo.get(0).toString().trim());
+    Assert.assertEquals("Salary", 30000, columnInfo.get(1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
new file mode 100644
index 0000000..72cfb88
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/JDBCLoaderTest.java
@@ -0,0 +1,179 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.netlet.util.DTThrowable;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
+
+public class JDBCLoaderTest
+{
+  static final org.slf4j.Logger logger = LoggerFactory.getLogger(JDBCLoaderTest.class);
+
+  public static class TestMeta extends TestWatcher
+  {
+    JDBCLoader dbloader;
+    @Override
+    protected void starting(Description description)
+    {
+        try {
+          dbloader = new JDBCLoader();
+          dbloader.setDatabaseDriver("org.hsqldb.jdbcDriver");
+          dbloader.setDatabaseUrl("jdbc:hsqldb:mem:test;sql.syntax_mys=true");
+          dbloader.setTableName("COMPANY");
+
+          dbloader.connect();
+          createTable();
+          insertRecordsInTable();
+        }
+        catch (Throwable e) {
+            DTThrowable.rethrow(e);
+        }
+    }
+
+    private void createTable()
+    {
+        try {
+            Statement stmt = dbloader.getConnection().createStatement();
+
+            String createTable = "CREATE TABLE IF NOT EXISTS " + dbloader.getTableName() +
+                    "(ID INT PRIMARY KEY     NOT NULL," +
+                    " NAME           TEXT    NOT NULL, " +
+                    " AGE            INT     NOT NULL, " +
+                    " ADDRESS        CHAR(50), " +
+                    " SALARY         REAL)";
+            stmt.executeUpdate(createTable);
+
+            logger.debug("Table  created successfully...");
+        }
+        catch (Throwable e) {
+            DTThrowable.rethrow(e);
+        }
+    }
+
+    private void insertRecordsInTable()
+    {
+      try {
+        Statement stmt = dbloader.getConnection().createStatement();
+        String tbName = dbloader.getTableName();
+
+        String sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+            "VALUES (1, 'Paul', 32, 'California', 20000.00 );";
+        stmt.executeUpdate(sql);
+
+        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+            "VALUES (2, 'Allen', 25, 'Texas', 15000.00 );";
+        stmt.executeUpdate(sql);
+
+        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+            "VALUES (3, 'Teddy', 23, 'Norway', 20000.00 );";
+        stmt.executeUpdate(sql);
+
+        sql = "INSERT INTO " + tbName + " (ID,NAME,AGE,ADDRESS,SALARY) " +
+            "VALUES (4, 'Mark', 25, 'Rich-Mond', 65000.00 );";
+        stmt.executeUpdate(sql);
+      }
+      catch (Throwable e) {
+        DTThrowable.rethrow(e);
+      }
+
+    }
+
+    private void cleanTable()
+    {
+        String sql = "delete from  " + dbloader.tableName;
+        try {
+          Statement stmt = dbloader.getConnection().createStatement();
+          stmt.executeUpdate(sql);
+          logger.debug("Table deleted successfully...");
+        } catch (SQLException e) {
+          DTThrowable.rethrow(e);
+        }
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      cleanTable();
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testMysqlDBLookup() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ArrayList<String> lookupKeys = new ArrayList<String>();
+    lookupKeys.add("ID");
+    ArrayList<String> includeKeys = new ArrayList<String>();
+    includeKeys.add("NAME");
+    includeKeys.add("AGE");
+    includeKeys.add("ADDRESS");
+    testMeta.dbloader.setFields(lookupKeys, includeKeys);
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add("4");
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("NAME", "Mark", columnInfo.get(0).toString().trim());
+    Assert.assertEquals("AGE", 25, columnInfo.get(1));
+    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(2).toString().trim());
+  }
+
+  @Test
+  public void testMysqlDBLookupIncludeAllKeys() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    ArrayList<String> lookupKeys = new ArrayList<String>();
+    lookupKeys.add("ID");
+    ArrayList<String> includeKeys = new ArrayList<String>();
+    testMeta.dbloader.setFields(lookupKeys, includeKeys);
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add("4");
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("ID", 4, columnInfo.get(0));
+    Assert.assertEquals("NAME", "Mark", columnInfo.get(1).toString().trim());
+    Assert.assertEquals("AGE", 25, columnInfo.get(2));
+    Assert.assertEquals("ADDRESS", "Rich-Mond", columnInfo.get(3).toString().trim());
+    Assert.assertEquals("SALARY", 65000.0, columnInfo.get(4));
+  }
+
+  @Test
+  public void testMysqlDBQuery() throws Exception
+  {
+    CountDownLatch latch = new CountDownLatch(1);
+
+    testMeta.dbloader.setQueryStmt("Select id, name from " + testMeta.dbloader.getTableName() + " where AGE = ? and ADDRESS = ?");
+
+    latch.await(1000, TimeUnit.MILLISECONDS);
+
+    ArrayList<Object> keys = new ArrayList<Object>();
+    keys.add("25");
+    keys.add("Texas");
+
+    ArrayList<Object> columnInfo = (ArrayList<Object>) testMeta.dbloader.get(keys);
+
+    Assert.assertEquals("ID", 2, columnInfo.get(0));
+    Assert.assertEquals("NAME", "Allen", columnInfo.get(1).toString().trim());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/528e7aca/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
new file mode 100644
index 0000000..845fe80
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrichment/MapEnrichmentOperatorTest.java
@@ -0,0 +1,152 @@
+package com.datatorrent.contrib.enrichment;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MapEnrichmentOperatorTest
+{
+  @Test
+  public void includeAllKeys()
+  {
+    MapEnrichmentOperator oper = new MapEnrichmentOperator();
+    oper.setStore(new MemoryStore());
+    oper.setLookupFieldsStr("In1");
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    Map<String, Object> inMap = Maps.newHashMap();
+    inMap.put("In1", "Value1");
+    inMap.put("In2", "Value2");
+
+    oper.beginWindow(1);
+    oper.input.process(inMap);
+    oper.endWindow();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, C=Val_C, In2=Value2, In1=Value3}", sink.collectedTuples.get(0).toString());
+  }
+
+  @Test
+  public void includeSelectedKeys()
+  {
+    MapEnrichmentOperator oper = new MapEnrichmentOperator();
+    oper.setStore(new MemoryStore());
+    oper.setLookupFieldsStr("In1");
+    oper.setIncludeFieldsStr("A,B");
+    oper.setup(null);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    TestUtils.setSink(oper.output, sink);
+
+    Map<String, Object> inMap = Maps.newHashMap();
+    inMap.put("In1", "Value1");
+    inMap.put("In2", "Value2");
+
+    oper.beginWindow(1);
+    oper.input.process(inMap);
+    oper.endWindow();
+
+    Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1, sink.collectedTuples.size());
+    Assert.assertEquals("Enrich Tuple: ", "{A=Val_A, B=Val_B, In2=Value2, In1=Value1}", sink.collectedTuples.get(0).toString());
+  }
+
+  private static class MemoryStore implements EnrichmentBackup
+  {
+    static Map<String, Map> returnData = Maps.newHashMap();
+    private List<String> includeFields;
+    static {
+      Map<String, String> map = Maps.newHashMap();
+      map.put("A", "Val_A");
+      map.put("B", "Val_B");
+      map.put("C", "Val_C");
+      map.put("In1", "Value3");
+      returnData.put("Value1", map);
+
+      map = Maps.newHashMap();
+      map.put("A", "Val_A_1");
+      map.put("B", "Val_B_1");
+      map.put("C", "Val_C");
+      map.put("In1", "Value3");
+      returnData.put("Value2", map);
+    }
+
+    @Override public Map<Object, Object> loadInitialData()
+    {
+      return null;
+    }
+
+    @Override public Object get(Object key)
+    {
+      List<String> keyList = (List<String>)key;
+      Map<String, String> keyValue = returnData.get(keyList.get(0));
+      ArrayList<Object> lst = new ArrayList<Object>();
+      if(CollectionUtils.isEmpty(includeFields)) {
+        if(includeFields == null)
+          includeFields = new ArrayList<String>();
+        for (Map.Entry<String, String> e : keyValue.entrySet()) {
+          includeFields.add(e.getKey());
+        }
+      }
+      for(String field : includeFields) {
+        lst.add(keyValue.get(field));
+      }
+      return lst;
+    }
+
+    @Override public List<Object> getAll(List<Object> keys)
+    {
+      return null;
+    }
+
+    @Override public void put(Object key, Object value)
+    {
+
+    }
+
+    @Override public void putAll(Map<Object, Object> m)
+    {
+
+    }
+
+    @Override public void remove(Object key)
+    {
+
+    }
+
+    @Override public void connect() throws IOException
+    {
+
+    }
+
+    @Override public void disconnect() throws IOException
+    {
+
+    }
+
+    @Override public boolean isConnected()
+    {
+      return false;
+    }
+
+    @Override public void setFields(List<String> lookupFields, List<String> includeFields)
+    {
+      this.includeFields = includeFields;
+
+    }
+
+    @Override
+    public boolean needRefresh() {
+      return false;
+    }
+  }
+}


Mime
View raw message