metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [41/51] [partial] incubator-metron git commit: Initial import of code from https://github.com/OpenSOC/opensoc at ac0b00373f8f56dfae03a8109af5feb373ea598e.
Date Tue, 08 Dec 2015 06:38:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapGetterHBaseImpl.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapGetterHBaseImpl.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapGetterHBaseImpl.java
new file mode 100644
index 0000000..9d85639
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapGetterHBaseImpl.java
@@ -0,0 +1,790 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.NoServerForRegionException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Singleton class which integrates with HBase table and returns pcaps sorted by
+ * timestamp(dsc) for the given list of keys. Creates HConnection if it is not
+ * already created and the same connection instance is being used for all
+ * requests
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapGetterHBaseImpl implements IPcapGetter {
+
+  /** The pcap getter h base. */
+  private static IPcapGetter pcapGetterHBase = null;
+
+  /** The Constant LOG. */
+  private static final Logger LOGGER = Logger
+      .getLogger(PcapGetterHBaseImpl.class);
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List,
+   * java.lang.String, long, long, boolean, boolean, long)
+   */
+  @Override
+  public PcapsResponse getPcaps(List<String> keys, String lastRowKey,
+      long startTime, long endTime, boolean includeReverseTraffic,
+      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+    Assert
+        .isTrue(
+            checkIfValidInput(keys, lastRowKey),
+            "No valid input. One of the value must be present from {keys, lastRowKey}");
+    LOGGER.info(" keys=" + keys.toString() + ";  lastRowKey="
+        + lastRowKey);
+
+    PcapsResponse pcapsResponse = new PcapsResponse();
+    // 1. Process partial response key
+    if (StringUtils.isNotEmpty(lastRowKey)) {
+      pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime,
+          endTime, true, includeDuplicateLastRow, maxResultSize);
+      // LOGGER.debug("after scanning lastRowKey=" +
+      // pcapsResponse.toString()+"*********************************************************************");
+      if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+        return pcapsResponse;
+      }
+    }
+    // 2. Process input keys
+    List<String> sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic);
+    List<String> unprocessedKeys = new ArrayList<String>();
+    unprocessedKeys.addAll(sortedKeys);
+    if (StringUtils.isNotEmpty(lastRowKey)) {
+      unprocessedKeys.clear();
+      unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys,
+          lastRowKey);
+    }
+    LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString());
+    if (!CollectionUtils.isEmpty(unprocessedKeys)) {
+      for (int i = 0; i < unprocessedKeys.size(); i++) {
+        pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i),
+            startTime, endTime, false, includeDuplicateLastRow, maxResultSize);
+        // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") ="
+        // +
+        // pcapsResponse.toString()+"*********************************************************************");
+        if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+          return pcapsResponse;
+        }
+      }
+    }
+    return pcapsResponse;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long,
+   * long, boolean)
+   */
+  @Override
+  public PcapsResponse getPcaps(String key, long startTime, long endTime,
+      boolean includeReverseTraffic) throws IOException {
+    Assert.hasText(key, "key must not be null or empty");
+    return getPcaps(Arrays.asList(key), null, startTime, endTime,
+        includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List)
+   */
+  @Override
+  public PcapsResponse getPcaps(List<String> keys) throws IOException {
+    Assert.notEmpty(keys, "'keys' must not be null or empty");
+    return getPcaps(keys, null, -1, -1,
+        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+        ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String)
+   */
+  @Override
+  public PcapsResponse getPcaps(String key) throws IOException {
+    Assert.hasText(key, "key must not be null or empty");
+    return getPcaps(Arrays.asList(key), null, -1, -1,
+        ConfigurationUtil.isDefaultIncludeReverseTraffic(), false,
+        ConfigurationUtil.getDefaultResultSize());
+  }
+
+  /**
+   * Always returns the singleton instance.
+   * 
+   * @return IPcapGetter singleton instance
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static IPcapGetter getInstance() throws IOException {
+    if (pcapGetterHBase == null) {
+      synchronized (PcapGetterHBaseImpl.class) {
+        if (pcapGetterHBase == null) {
+          pcapGetterHBase = new PcapGetterHBaseImpl();
+        }
+      }
+    }
+    return pcapGetterHBase;
+  }
+
+  /**
+   * Instantiates a new pcap getter h base impl.
+   */
+  private PcapGetterHBaseImpl() {
+  }
+
+  /**
+   * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to
+   * true; removes duplicates and sorts the list by ascending order;.
+   * 
+   * @param keys
+   *          input keys
+   * @param includeReverseTraffic
+   *          flag whether or not to include reverse traffic
+   * @return List<String>
+   */
+  @VisibleForTesting
+  List<String> sortKeysByAscOrder(List<String> keys,
+      boolean includeReverseTraffic) {
+    Assert.notEmpty(keys, "'keys' must not be null");
+    if (includeReverseTraffic) {
+      keys.addAll(PcapHelper.reverseKey(keys));
+    }
+    List<String> deDupKeys = removeDuplicateKeys(keys);
+    Collections.sort(deDupKeys);
+    return deDupKeys;
+  }
+
+  /**
+   * Removes the duplicate keys.
+   * 
+   * @param keys
+   *          the keys
+   * @return the list
+   */
+  @VisibleForTesting
+  List<String> removeDuplicateKeys(List<String> keys) {
+    Set<String> set = new HashSet<String>(keys);
+    return new ArrayList<String>(set);
+  }
+
+  /**
+   * <p>
+   * Returns the sublist starting from the element after the lastRowKey
+   * to the last element in the list; if the 'lastRowKey' is not matched
+   * the complete list will be returned.
+   * </p>
+   * 
+   * <pre>
+   * Eg :
+   *  keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   *  lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
+   *  and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   * </pre>
+   * 
+   * @param keys
+   *          keys
+   * @param lastRowKey
+   *          last row key of the previous partial response
+   * @return List<String>
+   */
+  @VisibleForTesting
+  List<String> getUnprocessedSublistOfKeys(List<String> keys,
+      String lastRowKey) {
+    Assert.notEmpty(keys, "'keys' must not be null");
+    Assert.hasText(lastRowKey, "'lastRowKey' must not be null");
+    String partialKey = getTokens(lastRowKey, 5);
+    int startIndex = 0;
+    for (int i = 0; i < keys.size(); i++) {
+      if (partialKey.equals(keys.get(i))) {
+        startIndex = i + 1;
+        break;
+      }
+    }
+    List<String> unprocessedKeys = keys.subList(startIndex, keys.size());
+    return unprocessedKeys;
+  }
+
+  /**
+   * Returns the first 'noOfTokens' tokens from the given key; token delimiter
+   * "-";.
+   * 
+   * @param key
+   *          given key
+   * @param noOfTokens
+   *          number of tokens to retrieve
+   * @return the tokens
+   */
+  @VisibleForTesting
+  String getTokens(String key, int noOfTokens) {
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    String[] keyTokens = key.split(regex);
+    Assert.isTrue(noOfTokens < keyTokens.length,
+        "Invalid value for 'noOfTokens'");
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < noOfTokens; i++) {
+      sbf.append(keyTokens[i]);
+      if (i != (noOfTokens - 1)) {
+        sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER);
+      }
+
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * Process key.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param key
+   *          the key
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @param isPartialResponse
+   *          the is partial response
+   * @param includeDuplicateLastRow
+   *          the include duplicate last row
+   * @param maxResultSize
+   *          the max result size
+   * @return the pcaps response
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
+      long startTime, long endTime, boolean isPartialResponse,
+      boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
+    HTable table = null;
+    Scan scan = null;
+    List<Cell> scannedCells = null;
+    try {
+      // 1. Create start and stop row for the key;
+      Map<String, String> keysMap = createStartAndStopRowKeys(key,
+          isPartialResponse, includeDuplicateLastRow);
+
+      // 2. if the input key contains all fragments (7) and it is not part
+      // of previous partial response (isPartialResponse),
+      // 'keysMap' will be null; do a Get; currently not doing any
+      // response size related checks for Get;
+      // by default all cells from a specific row are sorted by timestamp
+      if (keysMap == null) {
+        Get get = createGetRequest(key, startTime, endTime);
+        List<Cell> cells = executeGetRequest(table, get);
+        for (Cell cell : cells) {
+          pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
+        }
+        return pcapsResponse;
+      }
+      // 3. Create and execute Scan request
+      scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
+          maxResultSize);
+      scannedCells = executeScanRequest(table, scan);
+      LOGGER.info("scannedCells size :" + scannedCells.size());
+      addToResponse(pcapsResponse, scannedCells, maxResultSize);
+
+    } catch (IOException e) {
+      LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
+          + key, e);
+      if (e instanceof ZooKeeperConnectionException
+          || e instanceof MasterNotRunningException
+          || e instanceof NoServerForRegionException) {
+        int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
+        System.out.println("maxRetryLimit =" + maxRetryLimit);
+        for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
+          System.out.println("attempting  =" + attempt);
+          try {
+            HBaseConfigurationUtil.closeConnection(); // closing the
+            // existing
+            // connection
+            // and retry,
+            // it will
+            // create a new
+            // HConnection
+            scannedCells = executeScanRequest(table, scan);
+            addToResponse(pcapsResponse, scannedCells, maxResultSize);
+            break;
+          } catch (IOException ie) {
+            if (attempt == maxRetryLimit) {
+              LOGGER.error("Throwing the exception after retrying "
+                  + maxRetryLimit + " times.");
+              throw e;
+            }
+          }
+        }
+      }
+
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+    return pcapsResponse;
+  }
+
+  /**
+   * Adds the to response.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param scannedCells
+   *          the scanned cells
+   * @param maxResultSize
+   *          the max result size
+   */
+  private void addToResponse(PcapsResponse pcapsResponse,
+      List<Cell> scannedCells, long maxResultSize) {
+    String lastKeyFromCurrentScan = null;
+    if (scannedCells != null && scannedCells.size() > 0) {
+      lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells
+          .get(scannedCells.size() - 1)));
+    }
+    // 4. calculate the response size
+    Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
+    for (Cell sortedCell : scannedCells) {
+      pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell));
+    }
+    if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) {
+      pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size
+                                                             // reached
+      pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan));
+    }
+  }
+
+  /**
+   * Builds start and stop row keys according to the following logic : 1.
+   * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input
+   * 'key' contains (assume : configuredTokensInRowKey=7 and
+   * minimumTokensIninputKey=5): a). 5 tokens
+   * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens
+   * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999"
+   * 
+   * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the
+   * key is NOT part of the partial response from previous request, return
+   * 'null' 2>. if the key is part of partial response from previous request
+   * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added
+   * to exclude this key as it was included in the previous request stopKey =
+   * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999"
+   * 
+   * @param key
+   *          the key
+   * @param isLastRowKey
+   *          if the key is part of partial response
+   * @param includeDuplicateLastRow
+   *          the include duplicate last row
+   * @return Map<String, String>
+   */
+  @VisibleForTesting
+  Map<String, String> createStartAndStopRowKeys(String key,
+      boolean isLastRowKey, boolean includeDuplicateLastRow) {
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    String[] keyTokens = key.split(regex);
+
+    String startKey = null;
+    String endKey = null;
+    Map<String, String> map = new HashMap<String, String>();
+
+    int configuredTokensInRowKey = ConfigurationUtil
+        .getConfiguredTokensInRowkey();
+    int minimumTokensIninputKey = ConfigurationUtil
+        .getMinimumTokensInInputkey();
+    Assert
+        .isTrue(
+            minimumTokensIninputKey <= configuredTokensInRowKey,
+            "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key ");
+    // in case if the input key contains 'configuredTokensInRowKey' tokens and
+    // it is NOT a
+    // partial response key, do a Get instead of Scan
+    if (keyTokens.length == configuredTokensInRowKey) {
+      if (!isLastRowKey) {
+        return null;
+      }
+      // it is a partial response key; 'startKey' is same as input partial
+      // response key; 'endKey' can be built by replacing
+      // (configuredTokensInRowKey - minimumTokensIninputKey) tokens
+      // of input partial response key with '99999'
+      if (keyTokens.length == minimumTokensIninputKey) {
+        return null;
+      }
+      int appendingTokenSlots = configuredTokensInRowKey
+          - minimumTokensIninputKey;
+      if (appendingTokenSlots > 0) {
+        String partialKey = getTokens(key, minimumTokensIninputKey);
+        StringBuffer sbfStartNew = new StringBuffer(partialKey);
+        StringBuffer sbfEndNew = new StringBuffer(partialKey);
+        for (int i = 0; i < appendingTokenSlots; i++) {
+          if (i == (appendingTokenSlots - 1)) {
+            if (!includeDuplicateLastRow) {
+              sbfStartNew
+                  .append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+                  .append(
+                      Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1);
+            } else {
+              sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER)
+                  .append(keyTokens[minimumTokensIninputKey + i]);
+            }
+          } else {
+            sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+                keyTokens[minimumTokensIninputKey + i]);
+          }
+          sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+              getMaxLimitForAppendingTokens());
+        }
+        startKey = sbfStartNew.toString();
+        endKey = sbfEndNew.toString();
+      }
+    } else {
+      StringBuffer sbfStart = new StringBuffer(key);
+      StringBuffer sbfEnd = new StringBuffer(key);
+      for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) {
+        sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+            getMinLimitForAppendingTokens());
+        sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append(
+            getMaxLimitForAppendingTokens());
+      }
+      startKey = sbfStart.toString();
+      endKey = sbfEnd.toString();
+    }
+    map.put(HBaseConfigConstants.START_KEY, startKey);
+    map.put(HBaseConfigConstants.END_KEY, endKey);
+
+    return map;
+  }
+
+  /**
+   * Returns false if keys is empty or null AND lastRowKey is null or
+   * empty; otherwise returns true;.
+   * 
+   * @param keys
+   *          input row keys
+   * @param lastRowKey
+   *          partial response key
+   * @return boolean
+   */
+  @VisibleForTesting
+  boolean checkIfValidInput(List<String> keys, String lastRowKey) {
+    if (CollectionUtils.isEmpty(keys)
+        && StringUtils.isEmpty(lastRowKey)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Executes the given Get request.
+   * 
+   * @param table
+   *          hbase table
+   * @param get
+   *          Get
+   * @return List<Cell>
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private List<Cell> executeGetRequest(HTable table, Get get)
+      throws IOException {
+    LOGGER.info("Get :" + get.toString());
+    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+        ConfigurationUtil.getTableName());
+    Result result = table.get(get);
+    List<Cell> cells = result.getColumnCells(
+        ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+    return cells;
+  }
+
+  /**
+   * Execute scan request.
+   * 
+   * @param table
+   *          hbase table
+   * @param scan
+   *          the scan
+   * @return the list
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private List<Cell> executeScanRequest(HTable table, Scan scan)
+      throws IOException {
+    LOGGER.info("Scan :" + scan.toString());
+    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+    		ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
+    ResultScanner resultScanner = table.getScanner(scan);
+    List<Cell> scannedCells = new ArrayList<Cell>();
+    for (Result result = resultScanner.next(); result != null; result = resultScanner
+        .next()) {
+      List<Cell> cells = result.getColumnCells(
+          ConfigurationUtil.getColumnFamily(),
+          ConfigurationUtil.getColumnQualifier());
+      if (cells != null) {
+        for (Cell cell : cells) {
+          scannedCells.add(cell);
+        }
+      }
+    }
+    return scannedCells;
+  }
+
+  /**
+   * Creates the get request.
+   * 
+   * @param key
+   *          the key
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @return the gets the
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  Get createGetRequest(String key, long startTime, long endTime)
+      throws IOException {
+    Get get = new Get(Bytes.toBytes(key));
+    // set family name
+    get.addFamily(ConfigurationUtil.getColumnFamily());
+
+    // set column family, qualifier
+    get.addColumn(ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+
+    // set max versions
+    get.setMaxVersions(ConfigurationUtil.getMaxVersions());
+
+    // set time range
+    setTimeRangeOnGet(get, startTime, endTime);
+    return get;
+  }
+
+  /**
+   * Creates the scan request.
+   * 
+   * @param pcapsResponse
+   *          the pcaps response
+   * @param keysMap
+   *          the keys map
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @param maxResultSize
+   *          the max result size
+   * @return the scan
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  Scan createScanRequest(PcapsResponse pcapsResponse,
+      Map<String, String> keysMap, long startTime, long endTime,
+      long maxResultSize) throws IOException {
+    Scan scan = new Scan();
+    // set column family, qualifier
+    scan.addColumn(ConfigurationUtil.getColumnFamily(),
+        ConfigurationUtil.getColumnQualifier());
+
+    // set start and stop keys
+    scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes());
+    scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes());
+
+    // set max results size : remaining size = max results size - ( current
+    // pcaps response size + possible maximum row size)
+    long remainingSize = maxResultSize
+        - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize());
+
+    if (remainingSize > 0) {
+      scan.setMaxResultSize(remainingSize);
+    }
+    // set max versions
+    scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
+        "hbase.table.column.maxVersions"));
+
+    // set time range
+    setTimeRangeOnScan(scan, startTime, endTime);
+    return scan;
+  }
+
+  /**
+   * Sets the time range on scan.
+   * 
+   * @param scan
+   *          the scan
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void setTimeRangeOnScan(Scan scan, long startTime, long endTime)
+      throws IOException {
+    boolean setTimeRange = true;
+    if (startTime < 0 && endTime < 0) {
+      setTimeRange = false;
+    }
+    if (setTimeRange) {
+      if (startTime < 0) {
+        startTime = 0;
+      } else {
+        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+      }
+      if (endTime < 0) {
+        endTime = Long.MAX_VALUE;
+      } else {
+        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+      }
+      Assert.isTrue(startTime < endTime,
+          "startTime value must be less than endTime value");
+      scan.setTimeRange(startTime, endTime);
+    }
+  }
+
+  /**
+   * Sets the time range on get.
+   * 
+   * @param get
+   *          the get
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void setTimeRangeOnGet(Get get, long startTime, long endTime)
+      throws IOException {
+    boolean setTimeRange = true;
+    if (startTime < 0 && endTime < 0) {
+      setTimeRange = false;
+    }
+    if (setTimeRange) {
+      if (startTime < 0) {
+        startTime = 0;
+      } else {
+        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+      }
+      if (endTime < 0) {
+        endTime = Long.MAX_VALUE;
+      } else {
+        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+      }
+      Assert.isTrue(startTime < endTime,
+          "startTime value must be less than endTime value");
+      get.setTimeRange(startTime, endTime);
+    }
+  }
+
+  /**
+   * Gets the min limit for appending tokens.
+   * 
+   * @return the min limit for appending tokens
+   */
+  private String getMinLimitForAppendingTokens() {
+    int digits = ConfigurationUtil.getAppendingTokenDigits();
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < digits; i++) {
+      sbf.append("0");
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * Gets the max limit for appending tokens.
+   * 
+   * @return the max limit for appending tokens
+   */
+  private String getMaxLimitForAppendingTokens() {
+    int digits = ConfigurationUtil.getAppendingTokenDigits();
+    StringBuffer sbf = new StringBuffer();
+    for (int i = 0; i < digits; i++) {
+      sbf.append("9");
+    }
+    return sbf.toString();
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static void main(String[] args) throws IOException {
+    if (args == null || args.length < 2) {
+      usage();
+      return;
+    }
+    String outputFileName = null;
+    outputFileName = args[1];
+    List<String> keys = Arrays.asList(StringUtils.split(args[2], ","));
+    System.out.println("Geting keys " + keys);
+    long startTime = 0;
+    long endTime = Long.MAX_VALUE;
+    if (args.length > 3) {
+      startTime = Long.valueOf(args[3]);
+    }
+    if (args.length > 4) {
+      endTime = Long.valueOf(args[4]);
+    }
+    System.out.println("With start time " + startTime + " and end time "
+        + endTime);
+    PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl();
+    PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime,
+        false, false, 6);
+    File file = new File(outputFileName);
+    FileUtils.write(file, "", false);
+    FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true);
+  }
+
+  /**
+   * Usage.
+   */
+  private static void usage() {
+    System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable
+        // debuggingCode
+        + " <zk quorum> <output file> <start key> [stop key]");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapHelper.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapHelper.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapHelper.java
new file mode 100644
index 0000000..469974f
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapHelper.java
@@ -0,0 +1,205 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.mortbay.log.Log;
+import org.springframework.util.Assert;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * utility class which holds methods related to time conversions, building
+ * reverse keys.
+ */
+public class PcapHelper {
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger.getLogger(PcapHelper.class);
+
+  /** The cell timestamp comparator. */
+  private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator();
+
+  /**
+   * The Enum TimeUnit.
+   */
+  public enum TimeUnit {
+
+    /** The seconds. */
+    SECONDS,
+    /** The millis. */
+    MILLIS,
+    /** The micros. */
+    MICROS,
+    /** The unknown. */
+    UNKNOWN
+  };
+
+  /**
+   * Converts the given time to the 'hbase' data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  public static long convertToDataCreationTimeUnit(long inputTime) {
+    if (inputTime <= 9999999999L) {
+      return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit
+                                                              // is in seconds
+    } else if (inputTime <= 9999999999999L) {
+      return convertMillisToDataCreationTimeUnit(inputTime); // input time unit
+                                                             // is in millis
+    } else if (inputTime <= 9999999999999999L) {
+      return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit
+                                                             // it in micros
+    }
+    return inputTime; // input time unit is unknown
+  }
+
+  /**
+   * Returns the 'hbase' data creation time unit by reading
+   * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If
+   * none is mentioned in properties file, returns <code>TimeUnit.UNKNOWN</code>
+   * 
+   * @return TimeUnit
+   */
+  @VisibleForTesting
+  public static TimeUnit getDataCreationTimeUnit() {
+    String timeUnit = ConfigurationUtil.getConfiguration().getString(
+        "hbase.table.data.time.unit");
+    LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString());
+    if (StringUtils.isNotEmpty(timeUnit)) {
+      return TimeUnit.valueOf(timeUnit);
+    }
+    return TimeUnit.UNKNOWN;
+  }
+
+  /**
+   * Convert seconds to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  public static long convertSecondsToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Seconds To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return inputTime;
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime * 1000;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime * 1000 * 1000;
+    }
+    return inputTime;
+  }
+
+  /**
+   * Builds the reverseKey to fetch the pcaps in the reverse traffic
+   * (destination to source).
+   * 
+   * @param key
+   *          indicates hbase rowKey (partial or full) in the format
+   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+   * @return String indicates the key in the format
+   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
+   */
+  public static String reverseKey(String key) {
+    Assert.hasText(key, "key must not be null or empty");
+    String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER;
+    String regex = "\\" + delimeter;
+    StringBuffer sb = new StringBuffer();
+    try {
+      String[] tokens = key.split(regex);
+      Assert
+          .isTrue(
+              (tokens.length == 5 || tokens.length == 6 || tokens.length == 7),
+              "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'");
+      sb.append(tokens[1]).append(delimeter).append(tokens[0])
+          .append(delimeter).append(tokens[2]).append(delimeter)
+          .append(tokens[4]).append(delimeter).append(tokens[3]);
+    } catch (Exception e) {
+      Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Builds the reverseKeys to fetch the pcaps in the reverse traffic
+   * (destination to source). If all keys in the input are not in the expected
+   * format, it returns an empty list;
+   * 
+   * @param keys
+   *          indicates list of hbase rowKeys (partial or full) in the format
+   *          "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment"
+   * @return List<String> indicates the list of keys in the format
+   *         "dstAddr-srcAddr-protocol-dstPort-srcPort"
+   */
+  public static List<String> reverseKey(List<String> keys) {
+    Assert.notEmpty(keys, "'keys' must not be null or empty");
+    List<String> reverseKeys = new ArrayList<String>();
+    for (String key : keys) {
+      if (key != null) {
+        String reverseKey = reverseKey(key);
+        if (StringUtils.isNotEmpty(reverseKey)) {
+          reverseKeys.add(reverseKey);
+        }
+      }
+    }
+    return reverseKeys;
+  }
+
+  /**
+   * Returns Comparator for sorting pcaps cells based on the timestamp (dsc).
+   * 
+   * @return CellTimestampComparator
+   */
+  public static CellTimestampComparator getCellTimestampComparator() {
+    return CELL_TIMESTAMP_COMPARATOR;
+  }
+
+  /**
+   * Convert millis to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  private static long convertMillisToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Millis To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return (inputTime / 1000);
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime * 1000;
+    }
+    return inputTime;
+  }
+
+  /**
+   * Convert micros to data creation time unit.
+   * 
+   * @param inputTime
+   *          the input time
+   * @return the long
+   */
+  @VisibleForTesting
+  private static long convertMicrosToDataCreationTimeUnit(long inputTime) {
+    System.out.println("convert Micros To DataCreation TimeUnit");
+    TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit();
+    if (TimeUnit.SECONDS == dataCreationTimeUnit) {
+      return inputTime / (1000 * 1000);
+    } else if (TimeUnit.MILLIS == dataCreationTimeUnit) {
+      return inputTime / 1000;
+    } else if (TimeUnit.MICROS == dataCreationTimeUnit) {
+      return inputTime;
+    }
+    return inputTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapReceiverImpl.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapReceiverImpl.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapReceiverImpl.java
new file mode 100644
index 0000000..f6eeab2
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapReceiverImpl.java
@@ -0,0 +1,212 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Controller;
+import org.springframework.util.Assert;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import com.cisco.opensoc.pcap.parsing.PcapUtils;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Single point of entry for all REST calls. Exposes methods to fetch pcaps for
+ * the given list of keys or range of keys and optional start time and end time.
+ * If the caller doesn't provide start time and end time, all pcaps from
+ * beginning of the time to until now are returned.
+ * 
+ * @author Sayi
+ * 
+ */
+@Controller
+public class PcapReceiverImpl implements IPcapReceiver {
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger.getLogger(PcapReceiverImpl.class);
+
+  /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */
+  private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition";
+
+  /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */
+  private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\"";
+
+  /** partial response key header name. */
+  private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey";
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByKeys(java.util.List,
+   * java.lang.String, long, long, boolean, boolean,
+   * javax.servlet.http.HttpServletResponse)
+   */
+  @Override
+  @RequestMapping(value = "/pcapGetter/getPcapsByKeys", produces = "application/octet-stream")
+  public ResponseEntity<byte[]> getPcapsByKeys(
+      @RequestParam(required = false) List<String> keys,
+      @RequestParam(required = false) String lastRowKey,
+      @RequestParam(defaultValue = "-1") long startTime,
+      @RequestParam(defaultValue = "-1") long endTime,
+      @RequestParam(required = false) boolean includeDuplicateLastRow,
+      @RequestParam(defaultValue = "false") boolean includeReverseTraffic,
+      @RequestParam(required = false) String maxResponseSize)
+      throws IOException {
+    Assert.notEmpty(keys, "'keys' must not be null or empty");
+    PcapsResponse pcapResponse = null;
+    MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
+    try {
+      IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+      pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey,
+          startTime, endTime, includeReverseTraffic, includeDuplicateLastRow,
+          ConfigurationUtil.validateMaxResultSize(maxResponseSize));
+      LOGGER.info("pcaps response in REST layer =" + pcapResponse.toString());
+
+      // return http status '204 No Content' if the pcaps response size is 0
+      if (pcapResponse == null || pcapResponse.getResponseSize() == 0) {
+        return new ResponseEntity<byte[]>(HttpStatus.NO_CONTENT);
+      }
+
+      // return http status '206 Partial Content', the partial response file and
+      // 'lastRowKey' header , if the pcaps response status is 'PARTIAL'
+      headers.add(HEADER_CONTENT_DISPOSITION_NAME,
+          HEADER_CONTENT_DISPOSITION_VALUE);
+      if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) {
+        headers.add(HEADER_PARTIAL_RESPONE_KEY,
+            pcapResponse.getLastRowKey());
+        return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
+            HttpStatus.PARTIAL_CONTENT);
+      }
+
+    } catch (IOException e) {
+      LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
+          + keys.toString(), e);
+      throw e;
+    }
+
+    // return http status '200 OK' along with the complete pcaps response file,
+    // and headers
+    return new ResponseEntity<byte[]>(pcapResponse.getPcaps(), headers,
+        HttpStatus.OK);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByKeyRange(java.lang.String
+   * , java.lang.String, java.lang.String, long, long,
+   * javax.servlet.http.HttpServletResponse)
+   */
+  @Override
+  @RequestMapping(value = "/pcapGetter/getPcapsByKeyRange", produces = "application/octet-stream")
+  public ResponseEntity<byte[]> getPcapsByKeyRange(
+      @RequestParam String startKey,
+      @RequestParam(required = false) String endKey,
+      @RequestParam(required = false) String maxResponseSize,
+      @RequestParam(defaultValue = "-1") long startTime,
+      @RequestParam(defaultValue = "-1") long endTime) throws IOException {
+    Assert.hasText(startKey, "'startKey' must not be null or empty");
+    MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
+    byte[] response = null;
+    try {
+      IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance();
+      response = pcapScanner.getPcaps(startKey, endKey,
+          ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime,
+          endTime);
+      if (response == null || response.length == 0) {
+        return new ResponseEntity<byte[]>(HttpStatus.NO_CONTENT);
+      }
+      headers.add(HEADER_CONTENT_DISPOSITION_NAME,
+          HEADER_CONTENT_DISPOSITION_VALUE);
+
+    } catch (IOException e) {
+      LOGGER.error(
+          "Exception occurred while fetching Pcaps for the key range : startKey="
+              + startKey + ", endKey=" + endKey, e);
+      throw e;
+    }
+    // return http status '200 OK' along with the complete pcaps response file,
+    // and headers
+    return new ResponseEntity<byte[]>(response, headers, HttpStatus.OK);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang
+   * .String, java.lang.String, java.lang.String, java.lang.String,
+   * java.lang.String, long, long, boolean,
+   * javax.servlet.http.HttpServletResponse)
+   */
+  @Override
+  @RequestMapping(value = "/pcapGetter/getPcapsByIdentifiers", produces = "application/octet-stream")
+  public ResponseEntity<byte[]> getPcapsByIdentifiers(
+      @RequestParam String srcIp, @RequestParam String dstIp,
+      @RequestParam String protocol, @RequestParam String srcPort,
+      @RequestParam String dstPort,
+      @RequestParam(defaultValue = "-1") long startTime,
+      @RequestParam(defaultValue = "-1") long endTime,
+      @RequestParam(defaultValue = "false") boolean includeReverseTraffic)
+      throws IOException {
+    Assert.hasText(srcIp, "'srcIp' must not be null or empty");
+    Assert.hasText(dstIp, "'dstIp' must not be null or empty");
+    Assert.hasText(protocol, "'protocol' must not be null or empty");
+    Assert.hasText(srcPort, "'srcPort' must not be null or empty");
+    Assert.hasText(dstPort, "'dstPort' must not be null or empty");
+    MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
+    PcapsResponse response = null;
+    try {
+      String sessionKey = PcapUtils.getSessionKey(srcIp, dstIp, protocol,
+          srcPort, dstPort);
+      LOGGER.info("sessionKey =" + sessionKey);
+      IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance();
+      response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null,
+          startTime, endTime, includeReverseTraffic, false,
+          ConfigurationUtil.getDefaultResultSize());
+      if (response == null || response.getResponseSize() == 0) {
+        return new ResponseEntity<byte[]>(HttpStatus.NO_CONTENT);
+      }
+      headers.add(HEADER_CONTENT_DISPOSITION_NAME,
+          HEADER_CONTENT_DISPOSITION_VALUE);
+
+    } catch (IOException e) {
+      LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+          e);
+      throw e;
+    }
+    // return http status '200 OK' along with the complete pcaps response file,
+    // and headers
+    return new ResponseEntity<byte[]>(response.getPcaps(), headers,
+        HttpStatus.OK);
+  }
+
+  /**
+   * This method parses the each value in the List using delimiter ',' and
+   * builds a new List;.
+   * 
+   * @param keys
+   *          list of keys to be parsed
+   * @return list of keys
+   */
+  @VisibleForTesting
+  List<String> parseKeys(List<String> keys) {
+    Assert.notEmpty(keys);
+    List<String> parsedKeys = new ArrayList<String>();
+    for (String key : keys) {
+      parsedKeys.addAll(Arrays.asList(StringUtils.split(StringUtils.trim(key),
+          ",")));
+    }
+    return parsedKeys;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapScannerHBaseImpl.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapScannerHBaseImpl.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapScannerHBaseImpl.java
new file mode 100644
index 0000000..5e0649e
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapScannerHBaseImpl.java
@@ -0,0 +1,302 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.NoServerForRegionException;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.springframework.util.Assert;
+
+import com.cisco.opensoc.pcap.parsing.PcapMerger;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Singleton class which integrates with HBase table and returns sorted pcaps
+ * based on the timestamp for the given range of keys. Creates HConnection if it
+ * is not already created and the same connection instance is being used for all
+ * requests
+ * 
+ * @author sheetal
+ * @version $Revision: 1.0 $
+ */
+public class PcapScannerHBaseImpl implements IPcapScanner {
+
+  /** The Constant LOGGER. */
+  private static final Logger LOGGER = Logger
+      .getLogger(PcapScannerHBaseImpl.class);
+
+  /** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */
+  private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0;
+
+  /** The pcap scanner h base. */
+  private static IPcapScanner pcapScannerHBase = null;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
+   * java.lang.String, long, long, long)
+   */
+  @Override
+  public byte[] getPcaps(String startKey, String endKey, long maxResultSize,
+      long startTime, long endTime) throws IOException {
+    Assert.hasText(startKey, "startKey must no be null or empty");
+    byte[] cf = Bytes.toBytes(ConfigurationUtil.getConfiguration()
+        .getString("hbase.table.column.family"));
+    byte[] cq = Bytes.toBytes(ConfigurationUtil.getConfiguration()
+        .getString("hbase.table.column.qualifier"));
+    // create scan request
+    Scan scan = createScanRequest(cf, cq, startKey, endKey, maxResultSize,
+        startTime, endTime);
+    List<byte[]> pcaps = new ArrayList<byte[]>();
+    HTable table = null;
+    try {
+      pcaps = scanPcaps(pcaps, table, scan, cf, cq);
+    } catch (IOException e) {
+      LOGGER.error(
+          "Exception occurred while fetching Pcaps for the key range : startKey="
+              + startKey + ", endKey=" + endKey, e);
+      if (e instanceof ZooKeeperConnectionException
+          || e instanceof MasterNotRunningException
+          || e instanceof NoServerForRegionException) {
+        int maxRetryLimit = getConnectionRetryLimit();
+        for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
+          try {
+            HBaseConfigurationUtil.closeConnection(); // closing the existing
+                                                      // connection and retry,
+                                                      // it will create a new
+                                                      // HConnection
+            pcaps = scanPcaps(pcaps, table, scan, cf, cq);
+            break;
+          } catch (IOException ie) {
+            if (attempt == maxRetryLimit) {
+              System.out.println("Throwing the exception after retrying "
+                  + maxRetryLimit + " times.");
+              throw e;
+            }
+          }
+        }
+      } else {
+        throw e;
+      }
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+    if (pcaps.size() == 1) {
+      return pcaps.get(0);
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, pcaps);
+    byte[] response = baos.toByteArray();
+    return response;
+  }
+
+  /**
+   * Creates the scan request.
+   * 
+   * @param cf
+   *          the cf
+   * @param cq
+   *          the cq
+   * @param startKey
+   *          the start key
+   * @param endKey
+   *          the end key
+   * @param maxResultSize
+   *          the max result size
+   * @param startTime
+   *          the start time
+   * @param endTime
+   *          the end time
+   * @return the scan
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  Scan createScanRequest(byte[] cf, byte[] cq, String startKey, String endKey,
+      long maxResultSize, long startTime, long endTime) throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(cf, cq);
+    scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt(
+        "hbase.table.column.maxVersions"));
+    scan.setStartRow(startKey.getBytes());
+    if (endKey != null) {
+      scan.setStopRow(endKey.getBytes());
+    }
+    scan.setMaxResultSize(maxResultSize);
+    boolean setTimeRange = true;
+    if (startTime < 0 && endTime < 0) {
+      setTimeRange = false;
+    }
+    if (setTimeRange) {
+      if (startTime < 0) {
+        startTime = 0;
+      } else {
+        startTime = PcapHelper.convertToDataCreationTimeUnit(startTime);
+      }
+      if (endTime < 0) {
+        endTime = Long.MAX_VALUE;
+      } else {
+        endTime = PcapHelper.convertToDataCreationTimeUnit(endTime);
+      }
+      Assert.isTrue(startTime < endTime,
+          "startTime value must be less than endTime value");
+    }
+    // create Scan request;
+    if (setTimeRange) {
+      scan.setTimeRange(startTime, endTime);
+    }
+    return scan;
+  }
+
+  /**
+   * Scan pcaps.
+   * 
+   * @param pcaps
+   *          the pcaps
+   * @param table
+   *          the table
+   * @param scan
+   *          the scan
+   * @param cf
+   *          the cf
+   * @param cq
+   *          the cq
+   * @return the list
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @VisibleForTesting
+  List<byte[]> scanPcaps(List<byte[]> pcaps, HTable table, Scan scan,
+      byte[] cf, byte[] cq) throws IOException {
+    LOGGER.info("Scan =" + scan.toString());
+    table = (HTable) HBaseConfigurationUtil.getConnection().getTable(
+    		ConfigurationUtil.getConfiguration().getString("hbase.table.name"));
+    ResultScanner resultScanner = table.getScanner(scan);
+    List<Cell> scannedCells = new ArrayList<Cell>();
+    for (Result result = resultScanner.next(); result != null; result = resultScanner
+        .next()) {
+      List<Cell> cells = result.getColumnCells(cf, cq);
+      if (cells != null) {
+        for (Cell cell : cells) {
+          scannedCells.add(cell);
+        }
+      }
+    }
+    Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator());
+    LOGGER.info("sorted cells :" + scannedCells.toString());
+    for (Cell sortedCell : scannedCells) {
+      pcaps.add(CellUtil.cloneValue(sortedCell));
+    }
+    return pcaps;
+  }
+
+  /**
+   * Gets the connection retry limit.
+   * 
+   * @return the connection retry limit
+   */
+  private int getConnectionRetryLimit() {
+    return ConfigurationUtil.getConfiguration().getInt(
+        "hbase.hconnection.retries.number", DEFAULT_HCONNECTION_RETRY_LIMIT);
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String,
+   * java.lang.String)
+   */
+  @Override
+  public byte[] getPcaps(String startKey, String endKey) throws IOException {
+    Assert.hasText(startKey, "startKey must no be null or empty");
+    Assert.hasText(endKey, "endKey must no be null or empty");
+    return getPcaps(startKey, endKey, ConfigurationUtil.getDefaultResultSize(),
+        -1, -1);
+  }
+
+  /**
+   * Always returns the singleton instance.
+   * 
+   * @return IPcapScanner singleton instance
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public static IPcapScanner getInstance() throws IOException {
+    if (pcapScannerHBase == null) {
+      synchronized (PcapScannerHBaseImpl.class) {
+        if (pcapScannerHBase == null) {
+          pcapScannerHBase = new PcapScannerHBaseImpl();
+        }
+      }
+    }
+    return pcapScannerHBase;
+  }
+
+  /**
+   * Instantiates a new pcap scanner h base impl.
+   */
+  private PcapScannerHBaseImpl() {
+  }
+
+  /**
+   * The main method.
+   */
+  // public static void main(String[] args) throws IOException {
+  // if (args == null || args.length < 3) {
+  // usage();
+  // return;
+  // }
+  // String outputFileName = null;
+  // String startKey = null;
+  // String stopKey = null;
+  // outputFileName = args[0];
+  // startKey = args[1];
+  // if (args.length > 2) { // NOPMD by sheetal on 1/29/14 3:55 PM
+  // stopKey = args[2];
+  // }
+  // PcapScannerHBaseImpl downloader = new PcapScannerHBaseImpl();
+  // byte[] pcaps = downloader.getPcaps(startKey, stopKey, defaultResultSize, 0,
+  // Long.MAX_VALUE);
+  // File file = new File(outputFileName);
+  // FileUtils.write(file, "", false);
+  // ByteArrayOutputStream baos = new ByteArrayOutputStream(); //
+  // $codepro.audit.disable
+  // // closeWhereCreated
+  // PcapMerger.merge(baos, pcaps);
+  // FileUtils.writeByteArrayToFile(file, baos.toByteArray(), true);
+  // }
+
+  /**
+   * Usage.
+   */
+  @SuppressWarnings("unused")
+  private static void usage() {
+    System.out.println("java " + PcapScannerHBaseImpl.class.getName() // NOPMD
+                                                                      // by
+        // sheetal
+        // <!-- //
+        // $codepro.audit.disable
+        // debuggingCode
+        // -->
+        // on
+        // 1/29/14
+        // 3:55
+        // PM
+        + " <zk quorum> <output file> <start key> [stop key]");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapsResponse.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapsResponse.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapsResponse.java
new file mode 100644
index 0000000..a8c8d1b
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/PcapsResponse.java
@@ -0,0 +1,151 @@
+/**
+ * 
+ */
+package com.cisco.opensoc.hbase.client;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.cisco.opensoc.pcap.parsing.PcapMerger;
+
+/**
+ * Holds pcaps data, status and the partial response key.
+ * 
+ * @author Sayi
+ */
+public class PcapsResponse {
+
+  /**
+   * The Enum Status.
+   */
+  public enum Status {
+    
+    /** The partial. */
+    PARTIAL, 
+ /** The complete. */
+ COMPLETE
+  };
+
+  /** response of the processed keys. */
+  private List<byte[]> pcaps = new ArrayList<byte[]>();;
+
+  /** partial response key. */
+  private String lastRowKey;
+
+  /** The status. */
+  private Status status = Status.COMPLETE;
+
+  /**
+   * Sets the pcaps.
+   * 
+   * @param pcaps
+   *          the new pcaps
+   */
+  public void setPcaps(List<byte[]> pcaps) {
+    this.pcaps = pcaps;
+  }
+
+  /**
+   * Adds the pcaps.
+   * 
+   * @param pcaps
+   *          the pcaps
+   */
+  public void addPcaps(byte[] pcaps) {
+    this.pcaps.add(pcaps);
+  }
+
+  /**
+   * Gets the partial response key.
+   * 
+   * @return the partial response key
+   */
+  public String getLastRowKey() {
+    return lastRowKey;
+  }
+
+  /**
+   * Sets the partial response key.
+   * 
+   * @param lastRowKey
+   *          the last row key
+   */
+  public void setLastRowKey(String lastRowKey) {
+    this.lastRowKey = lastRowKey;
+  }
+
+  /**
+   * Gets the status.
+   * 
+   * @return the status
+   */
+  public Status getStatus() {
+    return status;
+  }
+
+  /**
+   * Sets the status.
+   * 
+   * @param status
+   *          the new status
+   */
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+
+  /**
+   * Checks if is resonse size within limit.
+   * 
+   * @param maxResultSize
+   *          the max result size
+   * @return true, if is resonse size within limit
+   */
+  public boolean isResonseSizeWithinLimit(long maxResultSize) {
+    // System.out.println("isResonseSizeWithinLimit() : getResponseSize() < (input|default result size - maximum packet size ) ="+
+    // getResponseSize()+ " < " + ( maxResultSize
+    // -ConfigurationUtil.getMaxRowSize()));
+    return getResponseSize() < (maxResultSize - ConfigurationUtil
+        .getMaxRowSize());
+  }
+
+  /**
+   * Gets the response size.
+   * 
+   * @return the response size
+   */
+  public long getResponseSize() {
+    long responseSize = 0;
+    for (byte[] pcap : this.pcaps) {
+      responseSize = responseSize + pcap.length;
+    }
+    return responseSize;
+  }
+
+  /**
+   * Gets the pcaps.
+   * 
+   * @return the pcaps
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  public byte[] getPcaps() throws IOException {
+    if (pcaps.size() == 1) {
+      return pcaps.get(0);
+    }
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PcapMerger.merge(baos, pcaps);
+    return baos.toByteArray();
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    return "PcapsResponse [lastRowKey=" + lastRowKey
+        + ", status=" + status + ", pcapsSize="
+        + String.valueOf(getResponseSize()) + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/RestTestingUtil.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/RestTestingUtil.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/RestTestingUtil.java
new file mode 100644
index 0000000..f8e82d3
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/java/com/cisco/opensoc/hbase/client/RestTestingUtil.java
@@ -0,0 +1,238 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * The Class RestTestingUtil.
+ */
+public class RestTestingUtil {
+  
+  /** The host name. */
+  public static String hostName = null;
+
+  /**
+   * Gets the pcaps by keys.
+   * 
+   * @param keys
+   *          the keys
+   * @return the pcaps by keys
+   */
+  @SuppressWarnings("unchecked")
+  private static void getPcapsByKeys(String keys) {
+    System.out
+        .println("**********************getPcapsByKeys ******************************************************************************************");
+    // 1.
+    String url = "http://" + hostName
+        + "/cisco-rest/pcapGetter/getPcapsByKeys?keys={keys}"
+        + "&includeReverseTraffic={includeReverseTraffic}"
+        + "&startTime={startTime}" + "&endTime={endTime}"
+        + "&maxResponseSize={maxResponseSize}";
+    // default values
+    String startTime = "-1";
+    String endTime = "-1";
+    String maxResponseSize = "6";
+    String includeReverseTraffic = "false";
+
+    @SuppressWarnings("rawtypes")
+    Map map = new HashMap();
+    map.put("keys", keys);
+    map.put("includeReverseTraffic", includeReverseTraffic);
+    map.put("startTime", startTime);
+    map.put("endTime", endTime);
+    map.put("maxResponseSize", maxResponseSize);
+
+    RestTemplate template = new RestTemplate();
+
+    // set headers and entity to send
+    HttpHeaders headers = new HttpHeaders();
+    headers.set("Accept", MediaType.APPLICATION_OCTET_STREAM_VALUE);
+    HttpEntity<Object> requestEntity = new HttpEntity<Object>(headers);
+
+    // 1.
+    ResponseEntity<byte[]> response1 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeys : request= <keys=%s; includeReverseTraffic=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            keys, includeReverseTraffic, startTime, endTime, maxResponseSize,
+            response1);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 2. with reverse traffic
+    includeReverseTraffic = "true";
+    map.put("includeReverseTraffic", includeReverseTraffic);
+    ResponseEntity<byte[]> response2 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeys : request= <keys=%s; includeReverseTraffic=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            keys, includeReverseTraffic, startTime, endTime, maxResponseSize,
+            response2);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 3.with time range
+    startTime = System.getProperty("startTime", "-1");
+    endTime = System.getProperty("endTime", "-1");
+    map.put("startTime", startTime);
+    map.put("endTime", endTime);
+    ResponseEntity<byte[]> response3 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeys : request= <keys=%s; includeReverseTraffic=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            keys, includeReverseTraffic, startTime, endTime, maxResponseSize,
+            response3);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 4.with maxResponseSize
+    maxResponseSize = System.getProperty("maxResponseSize", "6");
+    map.put("maxResponseSize", maxResponseSize);
+    ResponseEntity<byte[]> response4 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeys : request= <keys=%s; includeReverseTraffic=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            keys, includeReverseTraffic, startTime, endTime, maxResponseSize,
+            response4);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+  }
+
+  /**
+   * Gets the pcaps by keys range.
+   * 
+   * @param startKey
+   *          the start key
+   * @param endKey
+   *          the end key
+   * @return the pcaps by keys range
+   */
+  @SuppressWarnings("unchecked")
+  private static void getPcapsByKeysRange(String startKey, String endKey) {
+    System.out
+        .println("**********************getPcapsByKeysRange ******************************************************************************************");
+    // 1.
+    String url = "http://" + hostName
+        + "/cisco-rest/pcapGetter/getPcapsByKeyRange?startKey={startKey}"
+        + "&endKey={endKey}" + "&startTime={startTime}" + "&endTime={endTime}"
+        + "&maxResponseSize={maxResponseSize}";
+    // default values
+    String startTime = "-1";
+    String endTime = "-1";
+    String maxResponseSize = "6";
+    @SuppressWarnings("rawtypes")
+    Map map = new HashMap();
+    map.put("startKey", startKey);
+    map.put("endKey", "endKey");
+    map.put("startTime", startTime);
+    map.put("endTime", endTime);
+    map.put("maxResponseSize", maxResponseSize);
+
+    RestTemplate template = new RestTemplate();
+
+    // set headers and entity to send
+    HttpHeaders headers = new HttpHeaders();
+    headers.set("Accept", MediaType.APPLICATION_OCTET_STREAM_VALUE);
+    HttpEntity<Object> requestEntity = new HttpEntity<Object>(headers);
+
+    // 1.
+    ResponseEntity<byte[]> response1 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeysRange : request= <startKey=%s; endKey=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            startKey, endKey, startTime, endTime, maxResponseSize, response1);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 2. with time range
+    startTime = System.getProperty("startTime", "-1");
+    endTime = System.getProperty("endTime", "-1");
+    map.put("startTime", startTime);
+    map.put("endTime", endTime);
+    ResponseEntity<byte[]> response2 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeysRange : request= <startKey=%s; endKey=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            startKey, endKey, startTime, endTime, maxResponseSize, response2);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+    // 3. with maxResponseSize
+    maxResponseSize = System.getProperty("maxResponseSize", "6");
+    map.put("maxResponseSize", maxResponseSize);
+    ResponseEntity<byte[]> response3 = template.exchange(url, HttpMethod.GET,
+        requestEntity, byte[].class, map);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out
+        .format(
+            "getPcapsByKeysRange : request= <startKey=%s; endKey=%s; startTime=%s; endTime=%s; maxResponseSize=%s> \n response= %s \n",
+            startKey, endKey, startTime, endTime, maxResponseSize, response3);
+    System.out
+        .println("----------------------------------------------------------------------------------------------------");
+    System.out.println();
+
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    /*
+     * Run this program with system properties
+     * 
+     * -DhostName=mon.hw.com:8090
+     * -Dkeys=18800006-1800000b-06-0019-b39d,18800006-
+     * 1800000b-06-0050-5af6-64840-40785
+     * -DstartKey=18000002-18800002-06-0436-0019-2440-34545
+     * -DendKey=18000002-18800002-06-b773-0019-2840-34585
+     */
+
+    hostName = System.getProperty("hostName");
+
+    String keys = System.getProperty("keys");
+
+    String statyKey = System.getProperty("startKey");
+    String endKey = System.getProperty("endKey");
+
+    getPcapsByKeys(keys);
+    getPcapsByKeysRange(statyKey, endKey);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/config-definition-hbase.xml
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/config-definition-hbase.xml b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/config-definition-hbase.xml
new file mode 100644
index 0000000..efe05e8
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/config-definition-hbase.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+
+<configuration>
+	<header>
+		<result delimiterParsingDisabled="true" forceReloadCheck="true"></result>
+		<lookups>
+      		<lookup config-prefix="expr"
+              	config-class="org.apache.commons.configuration.interpol.ExprLookup">
+        		<variables>
+          			<variable name="System" value="Class:java.lang.System"/>
+          			<variable name="net" value="Class:java.net.InetAddress"/>
+          			<variable name="String" value="Class:org.apache.commons.lang.StringUtils"/>
+        		</variables>
+      		</lookup>
+    	</lookups>
+	</header>
+	<override>
+		<!-- 1. properties from 'hbae-config.properties' are loaded first; 
+				if a property is not present in this file, then it will search in the files in the order they are defined here.
+		     2. 'refreshDelay' indicates the minimum delay in milliseconds between checks to see if the underlying file is changed.
+		     3. 'config-optional' indicates this file is not required --> 
+		
+		<properties fileName="${expr:System.getProperty('configPath')+'/hbase-config.properties'}"  config-optional="true">
+			<reloadingStrategy refreshDelay="${expr:System.getProperty('configRefreshDelay')}"
+	      config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
+	     </properties>
+		
+		<properties fileName="hbase-config-default.properties" config-optional="true">
+<!-- 					<reloadingStrategy refreshDelay="${expr:System.getProperty('defaultConfigRefreshDelay')}"
+	      config-class="org.apache.commons.configuration.reloading.FileChangedReloadingStrategy"/>
+ -->	     </properties>
+		
+	</override>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/hbase-config-default.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/hbase-config-default.properties b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/hbase-config-default.properties
new file mode 100644
index 0000000..e9924ee
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/hbase-config-default.properties
@@ -0,0 +1,40 @@
+#hbase zoo keeper configuration
+hbase.zookeeper.quorum=zkpr1,zkpr2,zkpr3
+hbase.zookeeper.clientPort=2181
+hbase.client.retries.number=1
+zookeeper.session.timeout=60000
+zookeeper.recovery.retry=0
+
+#hbase table configuration
+hbase.table.name=pcap
+hbase.table.column.family=t
+hbase.table.column.qualifier=pcap
+hbase.table.column.maxVersions=5
+
+# scan size limit configuration in MB or KB; if the input is negative or greater than max value throw an error.
+hbase.scan.result.size.unit=MB
+hbase.scan.default.result.size=6
+hbase.scan.max.result.size=60
+
+# time stamp conversion configuration; possible values 'SECONDS'(seconds), 'MILLIS'(milli seconds), 'MICROS' (micro seconds)
+hbase.table.data.time.unit=MICROS
+
+#number of retries in case of ZooKeeper or HBase server down
+hbase.hconnection.retries.number=3
+
+#configuration for including pcaps in the reverse traffic
+pcaps.include.reverse.traffic = false
+
+#maximum table row size in KB or MB 
+hbase.table.row.size.unit = KB
+hbase.table.max.row.size = 70
+
+# tokens of row key configuration
+hbase.table.row.key.tokens=7
+rest.api.input.key.min.tokens=5
+
+# whether or not to include the last row from the previous request, applicable for only partial response scenario
+hbase.table.scan.include.duplicate.lastrow= true;
+
+#number of digits for appending tokens of the row key
+hbase.table.row.key.token.appending.digits=5

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/log4j.properties b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/log4j.properties
new file mode 100644
index 0000000..0b6ca10
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Root logger option
+log4j.rootLogger=TRACE,file,stdout
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/hbase/cisco-hbase.log
+log4j.appender.file.MaxFileSize=1MB
+log4j.appender.file.MaxBackupIndex=1
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
+
+# Direct log messages to console
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+
+log4j.logger.backtype.storm=DEBUG
+log4j.logger.clojure.tools=DEBUG
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/CellTimestampComparatorTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/CellTimestampComparatorTest.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/CellTimestampComparatorTest.java
new file mode 100644
index 0000000..639af33
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/CellTimestampComparatorTest.java
@@ -0,0 +1,92 @@
+package com.cisco.opensoc.hbase.client;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hbase.Cell;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.cisco.opensoc.hbase.client.CellTimestampComparator;
+
+/**
+ * The Class CellTimestampComparatorTest.
+ */
+public class CellTimestampComparatorTest {
+
+  /**
+   * Sets the up.
+   * 
+   * @throws Exception
+   *           the exception
+   */
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  /**
+   * Tear down.
+   * 
+   * @throws Exception
+   *           the exception
+   */
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  /**
+   * Test_less.
+   */
+  @Test
+  public void test_less() {
+    // mocking
+    Cell cell1 = Mockito.mock(Cell.class);
+    Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
+    Cell cell2 = Mockito.mock(Cell.class);
+    Mockito.when(cell2.getTimestamp()).thenReturn(13845345808L);
+
+    CellTimestampComparator comparator = new CellTimestampComparator();
+
+    // actual call and verify
+    Assert.assertTrue(comparator.compare(cell1, cell2) == -1);
+
+  }
+
+  /**
+   * Test_greater.
+   */
+  @Test
+  public void test_greater() {
+    // mocking
+    Cell cell1 = Mockito.mock(Cell.class);
+    Mockito.when(cell1.getTimestamp()).thenReturn(13745345808L);
+    Cell cell2 = Mockito.mock(Cell.class);
+    Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
+
+    CellTimestampComparator comparator = new CellTimestampComparator();
+
+    // actual call and verify
+    Assert.assertTrue(comparator.compare(cell1, cell2) == 1);
+
+  }
+
+  /**
+   * Test_equal.
+   */
+  @Test
+  public void test_equal() {
+    // mocking
+    Cell cell1 = Mockito.mock(Cell.class);
+    Mockito.when(cell1.getTimestamp()).thenReturn(13945345808L);
+    Cell cell2 = Mockito.mock(Cell.class);
+    Mockito.when(cell2.getTimestamp()).thenReturn(13945345808L);
+
+    CellTimestampComparator comparator = new CellTimestampComparator();
+
+    // actual call and verify
+    Assert.assertTrue(comparator.compare(cell1, cell2) == 0);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/ConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/ConfigurationUtilTest.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/ConfigurationUtilTest.java
new file mode 100644
index 0000000..48f3973
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/ConfigurationUtilTest.java
@@ -0,0 +1,50 @@
+package com.cisco.opensoc.hbase.client;
+
+import org.eclipse.jdt.internal.core.Assert;
+import org.junit.Test;
+
+import com.cisco.opensoc.hbase.client.ConfigurationUtil;
+import com.cisco.opensoc.hbase.client.ConfigurationUtil.SizeUnit;
+
+/**
+ * The Class ConfigurationUtilTest.
+ */
+public class ConfigurationUtilTest {
+
+  /**
+   * Test_get max allowable result size in bytes.
+   */
+  @Test
+  public void test_getMaxAllowableResultSizeInBytes() {
+    long result = ConfigurationUtil.getMaxResultSize();
+    Assert.isTrue(result == 62914560);
+  }
+
+  /**
+   * Test_get max allowable results size unit.
+   */
+  @Test
+  public void test_getMaxAllowableResultsSizeUnit() {
+    SizeUnit result = ConfigurationUtil.getResultSizeUnit();
+    Assert.isTrue(SizeUnit.MB == result);
+  }
+
+  /**
+   * Test_get max row size in bytes.
+   */
+  @Test
+  public void test_getMaxRowSizeInBytes() {
+    long result = ConfigurationUtil.getMaxRowSize();
+    Assert.isTrue(result == 71680);
+  }
+
+  /**
+   * Test_get max row size unit.
+   */
+  @Test
+  public void test_getMaxRowSizeUnit() {
+    SizeUnit result = ConfigurationUtil.getRowSizeUnit();
+    Assert.isTrue(SizeUnit.KB == result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseConfigurationUtilTest.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseConfigurationUtilTest.java
new file mode 100644
index 0000000..e8ec8f9
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseConfigurationUtilTest.java
@@ -0,0 +1,52 @@
+package com.cisco.opensoc.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.util.Assert;
+
+import com.cisco.opensoc.hbase.client.HBaseConfigurationUtil;
+
+/**
+ * The Class HBaseConfigurationUtilTest.
+ */
+public class HBaseConfigurationUtilTest {
+
+  /**
+   * Sets the up.
+   * 
+   * @throws Exception
+   *           the exception
+   */
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  /**
+   * Tear down.
+   * 
+   * @throws Exception
+   *           the exception
+   */
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  /**
+   * Test_read.
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  @Test
+  public void test_read() throws IOException {
+    Configuration configuration = HBaseConfigurationUtil.read();
+    Assert.isTrue(configuration != null, "Configuration must not be null");
+    Assert.isTrue(configuration.get("hbase.client.retries.number").equals("1"),
+        "value must be equal");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/05e188ba/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseIntegrationTest.java
----------------------------------------------------------------------
diff --git a/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseIntegrationTest.java b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseIntegrationTest.java
new file mode 100644
index 0000000..3eb2bb0
--- /dev/null
+++ b/opensoc-streaming/OpenSOC-PCAP_Reconstruction/hbase/src/test/java/com/cisco/opensoc/hbase/client/HBaseIntegrationTest.java
@@ -0,0 +1,74 @@
+/**
+ * 
+ */
+package com.cisco.opensoc.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * The Class HBaseIntegrationTest.
+ * 
+ * @author Sayi
+ */
+public class HBaseIntegrationTest {
+
+  /** The test util. */
+  private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+
+  /** The test table. */
+  private HTable testTable;
+
+  /**
+   * Inits the cluster.
+   * 
+   * @throws Exception
+   *           the exception
+   */
+  void initCluster() throws Exception {
+    // testUtil.getConfiguration().addResource("hbase-site-local.xml");
+    // testUtil.getConfiguration().reloadConfiguration();
+    // start mini hbase cluster
+    testUtil.startMiniCluster(1);
+    // create tables
+    createTable();
+
+  }
+
+  /**
+   * Creates the table.
+   * 
+   * @throws IOException
+   *           Signals that an I/O exception has occurred.
+   */
+  private void createTable() throws IOException {
+    testTable = testUtil.createTable("test_pcaps_local", "cf");
+    System.out.println("after 'test_pcaps_local' table creation ");
+    // create put
+    Put put = new Put(Bytes.toBytes("1111")); // row key =1111
+    put.add(Bytes.toBytes("cf"), Bytes.toBytes("packet"),
+        Bytes.toBytes("aaaaaaaa"));
+    testTable.put(put);
+    System.out.println("after testTable.put(put)");
+
+  }
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   * @throws Exception
+   *           the exception
+   */
+  public static void main(String[] args) throws Exception {
+    // HBaseIntegrationTest test = new HBaseIntegrationTest();
+    // test.initCluster();
+
+  }
+
+}


Mime
View raw message