Return-Path: X-Original-To: apmail-metron-commits-archive@minotaur.apache.org Delivered-To: apmail-metron-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 34B971793A for ; Tue, 26 Jan 2016 14:18:03 +0000 (UTC) Received: (qmail 25465 invoked by uid 500); 26 Jan 2016 14:18:03 -0000 Delivered-To: apmail-metron-commits-archive@metron.apache.org Received: (qmail 25442 invoked by uid 500); 26 Jan 2016 14:18:03 -0000 Mailing-List: contact commits-help@metron.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.incubator.apache.org Delivered-To: mailing list commits@metron.incubator.apache.org Received: (qmail 25432 invoked by uid 99); 26 Jan 2016 14:18:03 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 14:18:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8872FC0D80 for ; Tue, 26 Jan 2016 14:18:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.247 X-Spam-Level: * X-Spam-Status: No, score=1.247 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id EW_c0DgXr0uh for ; Tue, 26 Jan 2016 14:17:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id 99C2920DA5 for ; Tue, 26 Jan 2016 14:17:43 +0000 (UTC) Received: (qmail 23624 invoked by uid 99); 26 Jan 2016 14:17:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2016 14:17:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 22B94E0243; Tue, 26 Jan 2016 14:17:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cestella@apache.org To: commits@metron.incubator.apache.org Date: Tue, 26 Jan 2016 14:18:42 -0000 Message-Id: In-Reply-To: <1f8befe107064ee2b3988e218e21b125@git.apache.org> References: <1f8befe107064ee2b3988e218e21b125@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [64/89] [abbrv] incubator-metron git commit: Move all com/apache folders to org/apache http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java new file mode 100644 index 0000000..6061a5d --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/HBaseConfigurationUtil.java @@ -0,0 +1,165 @@ +/** + * + */ +package com.apache.metron.pcapservice; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; + +/** + * Utility class which creates HConnection instance when the first request is + * received and registers a shut down hook which closes the connection when the + * JVM exits. Creates new connection to the cluster only if the existing + * connection is closed for unknown reasons. Also creates Configuration with + * HBase resources using configuration properties. + * + * @author Sayi + * + */ +public class HBaseConfigurationUtil { + + /** The Constant LOGGER. */ + private static final Logger LOGGER = Logger + .getLogger(HBaseConfigurationUtil.class); + + /** Configuration which holds all HBase properties. */ + private static Configuration config; + + /** + * A cluster connection which knows how to find master node and locate regions + * on the cluster. + */ + private static HConnection clusterConnection = null; + + /** + * Creates HConnection instance when the first request is received and returns + * the same instance for all subsequent requests if the connection is still + * open. + * + * @return HConnection instance + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static HConnection getConnection() throws IOException { + if (!connectionAvailable()) { + synchronized (HBaseConfigurationUtil.class) { + createClusterConncetion(); + } + } + return clusterConnection; + } + + /** + * Creates the cluster conncetion. + * + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private static void createClusterConncetion() throws IOException { + try { + if (connectionAvailable()) { + return; + } + clusterConnection = HConnectionManager.createConnection(read()); + addShutdownHook(); + System.out.println("Created HConnection and added shutDownHook"); + } catch (IOException e) { + LOGGER + .error( + "Exception occurred while creating HConnection using HConnectionManager", + e); + throw e; + } + } + + /** + * Connection available. + * + * @return true, if successful + */ + private static boolean connectionAvailable() { + if (clusterConnection == null) { + System.out.println("clusterConnection=" + clusterConnection); + return false; + } + System.out.println("clusterConnection.isClosed()=" + + clusterConnection.isClosed()); + return clusterConnection != null && !clusterConnection.isClosed(); + } + + /** + * Adds the shutdown hook. + */ + private static void addShutdownHook() { + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + public void run() { + System.out + .println("Executing ShutdownHook HBaseConfigurationUtil : Closing HConnection"); + try { + clusterConnection.close(); + } catch (IOException e) { + Log.debug("Caught ignorable exception ", e); + } + } + }, "HBaseConfigurationUtilShutDown")); + } + + /** + * Closes the underlying connection to cluster; ignores if any exception is + * thrown. + */ + public static void closeConnection() { + if (clusterConnection != null) { + try { + clusterConnection.close(); + } catch (IOException e) { + Log.debug("Caught ignorable exception ", e); + } + } + } + + /** + * This method creates Configuration with HBase resources using configuration + * properties. The same Configuration object will be used to communicate with + * all HBase tables; + * + * @return Configuration object + */ + public static Configuration read() { + if (config == null) { + synchronized (HBaseConfigurationUtil.class) { + if (config == null) { + config = HBaseConfiguration.create(); + + config.set( + HBaseConfigConstants.HBASE_ZOOKEEPER_QUORUM, + ConfigurationUtil.getConfiguration().getString( + "hbase.zookeeper.quorum")); + config.set( + HBaseConfigConstants.HBASE_ZOOKEEPER_CLIENT_PORT, + ConfigurationUtil.getConfiguration().getString( + "hbase.zookeeper.clientPort")); + config.set( + HBaseConfigConstants.HBASE_CLIENT_RETRIES_NUMBER, + ConfigurationUtil.getConfiguration().getString( + "hbase.client.retries.number")); + config.set( + HBaseConfigConstants.HBASE_ZOOKEEPER_SESSION_TIMEOUT, + ConfigurationUtil.getConfiguration().getString( + "zookeeper.session.timeout")); + config.set( + HBaseConfigConstants.HBASE_ZOOKEEPER_RECOVERY_RETRY, + ConfigurationUtil.getConfiguration().getString( + "zookeeper.recovery.retry")); + } + } + } + return config; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapGetter.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapGetter.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapGetter.java new file mode 100644 index 0000000..3bb10c2 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapGetter.java @@ -0,0 +1,88 @@ +/** + * + */ +package com.apache.metron.pcapservice; + +import java.io.IOException; +import java.util.List; + +/** + * interface to all 'keys' based pcaps fetching methods. + * + * @author Sayi + */ +public interface IPcapGetter { + + /** + * Gets the pcaps for the input list of keys and lastRowKey. + * + * @param keys + * the list of keys for which pcaps are to be retrieved + * @param lastRowKey + * last row key from the previous partial response + * @param startTime + * the start time in system milliseconds to be used to filter the + * pcaps. The value is set to '0' if the caller sends negative value + * @param endTime + * the end time in system milliseconds to be used to filter the + * pcaps. The value is set to Long.MAX_VALUE if the caller sends + * negative value. 'endTime' must be greater than the 'startTime'. + * @param includeReverseTraffic + * indicates whether or not to include pcaps from the reverse traffic + * @param includeDuplicateLastRow + * indicates whether or not to include the last row from the previous + * partial response + * @param maxResultSize + * the max result size + * @return PcapsResponse with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public PcapsResponse getPcaps(List keys, String lastRowKey, + long startTime, long endTime, boolean includeReverseTraffic, + boolean includeDuplicateLastRow, long maxResultSize) throws IOException; + + /** + * Gets the pcaps for the input key. + * + * @param key + * the key for which pcaps is to be retrieved. + * @param startTime + * the start time in system milliseconds to be used to filter the + * pcaps. The value is set to '0' if the caller sends negative value + * @param endTime + * the end time in system milliseconds to be used to filter the + * pcaps.The value is set to Long.MAX_VALUE if the caller sends + * negative value. 'endTime' must be greater than the 'startTime'. + * @param includeReverseTraffic + * indicates whether or not to include pcaps from the reverse traffic + * @return PcapsResponse with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public PcapsResponse getPcaps(String key, long startTime, long endTime, + boolean includeReverseTraffic) throws IOException; + + /** + * Gets the pcaps for the input list of keys. + * + * @param keys + * the list of keys for which pcaps are to be retrieved. + * @return PcapsResponse with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public PcapsResponse getPcaps(List keys) throws IOException; + + /** + * Gets the pcaps for the input key. + * + * @param key + * the key for which pcaps is to be retrieved. + * @return PcapsResponse with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public PcapsResponse getPcaps(String key) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java new file mode 100644 index 0000000..195c5d6 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/IPcapScanner.java @@ -0,0 +1,49 @@ +package com.apache.metron.pcapservice; + +import java.io.IOException; + +/** + * The Interface for all pcaps fetching methods based on key range. + */ +public interface IPcapScanner { + + /** + * Gets the pcaps for between startKey (inclusive) and endKey (exclusive). + * + * @param startKey + * the start key of a key range for which pcaps is to be retrieved. + * @param endKey + * the end key of a key range for which pcaps is to be retrieved. + * @param maxResponseSize + * indicates the maximum response size in MegaBytes(MB). User needs + * to pass positive value and must be less than 60 (MB) + * @param startTime + * the start time in system milliseconds to be used to filter the + * pcaps. The value is set to '0' if the caller sends negative value + * @param endTime + * the end time in system milliseconds to be used to filter the + * pcaps. The value is set Long.MAX_VALUE if the caller sends + * negative value + * @return byte array with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public byte[] getPcaps(String startKey, String endKey, long maxResponseSize, + long startTime, long endTime) throws IOException; + + /** + * Gets the pcaps for between startKey (inclusive) and endKey (exclusive). + * + * @param startKey + * the start key (inclusive) of a key range for which pcaps is to be + * retrieved. + * @param endKey + * the end key (exclusive) of a key range for which pcaps is to be + * retrieved. + * @return byte array with all matching pcaps merged together + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public byte[] getPcaps(String startKey, String endKey) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java new file mode 100644 index 0000000..d45a7e9 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapGetterHBaseImpl.java @@ -0,0 +1,809 @@ +package com.apache.metron.pcapservice; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Resource; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.NoServerForRegionException; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Singleton class which integrates with HBase table and returns pcaps sorted by + * timestamp(dsc) for the given list of keys. Creates HConnection if it is not + * already created and the same connection instance is being used for all + * requests + * + * @author sheetal + * @version $Revision: 1.0 $ + */ + +@Path("/") +public class PcapGetterHBaseImpl implements IPcapGetter { + + /** The pcap getter h base. */ + private static IPcapGetter pcapGetterHBase = null; + + /** The Constant LOG. */ + private static final Logger LOGGER = Logger + .getLogger(PcapGetterHBaseImpl.class); + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List, + * java.lang.String, long, long, boolean, boolean, long) + */ + + + @GET + @Path("pcap/test") + @Produces("text/html") + public Response index() throws URISyntaxException { + return Response.ok("ALL GOOD").build(); + } + + + public PcapsResponse getPcaps(List keys, String lastRowKey, + long startTime, long endTime, boolean includeReverseTraffic, + boolean includeDuplicateLastRow, long maxResultSize) throws IOException { + Assert + .isTrue( + checkIfValidInput(keys, lastRowKey), + "No valid input. One of the value must be present from {keys, lastRowKey}"); + LOGGER.info(" keys=" + keys.toString() + "; lastRowKey=" + + lastRowKey); + + PcapsResponse pcapsResponse = new PcapsResponse(); + // 1. Process partial response key + if (StringUtils.isNotEmpty(lastRowKey)) { + pcapsResponse = processKey(pcapsResponse, lastRowKey, startTime, + endTime, true, includeDuplicateLastRow, maxResultSize); + // LOGGER.debug("after scanning lastRowKey=" + + // pcapsResponse.toString()+"*********************************************************************"); + if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) { + return pcapsResponse; + } + } + // 2. Process input keys + List sortedKeys = sortKeysByAscOrder(keys, includeReverseTraffic); + List unprocessedKeys = new ArrayList(); + unprocessedKeys.addAll(sortedKeys); + if (StringUtils.isNotEmpty(lastRowKey)) { + unprocessedKeys.clear(); + unprocessedKeys = getUnprocessedSublistOfKeys(sortedKeys, + lastRowKey); + } + LOGGER.info("unprocessedKeys in getPcaps" + unprocessedKeys.toString()); + if (!CollectionUtils.isEmpty(unprocessedKeys)) { + for (int i = 0; i < unprocessedKeys.size(); i++) { + pcapsResponse = processKey(pcapsResponse, unprocessedKeys.get(i), + startTime, endTime, false, includeDuplicateLastRow, maxResultSize); + // LOGGER.debug("after scanning input unprocessedKeys.get(" + i + ") =" + // + + // pcapsResponse.toString()+"*********************************************************************"); + if (pcapsResponse.getStatus() == PcapsResponse.Status.PARTIAL) { + return pcapsResponse; + } + } + } + return pcapsResponse; + } + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String, long, + * long, boolean) + */ + + public PcapsResponse getPcaps(String key, long startTime, long endTime, + boolean includeReverseTraffic) throws IOException { + Assert.hasText(key, "key must not be null or empty"); + return getPcaps(Arrays.asList(key), null, startTime, endTime, + includeReverseTraffic, false, ConfigurationUtil.getDefaultResultSize()); + } + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.util.List) + */ + + public PcapsResponse getPcaps(List keys) throws IOException { + Assert.notEmpty(keys, "'keys' must not be null or empty"); + return getPcaps(keys, null, -1, -1, + ConfigurationUtil.isDefaultIncludeReverseTraffic(), false, + ConfigurationUtil.getDefaultResultSize()); + } + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapGetter#getPcaps(java.lang.String) + */ + + public PcapsResponse getPcaps(String key) throws IOException { + Assert.hasText(key, "key must not be null or empty"); + return getPcaps(Arrays.asList(key), null, -1, -1, + ConfigurationUtil.isDefaultIncludeReverseTraffic(), false, + ConfigurationUtil.getDefaultResultSize()); + } + + /** + * Always returns the singleton instance. + * + * @return IPcapGetter singleton instance + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static IPcapGetter getInstance() throws IOException { + if (pcapGetterHBase == null) { + synchronized (PcapGetterHBaseImpl.class) { + if (pcapGetterHBase == null) { + pcapGetterHBase = new PcapGetterHBaseImpl(); + } + } + } + return pcapGetterHBase; + } + + /** + * Instantiates a new pcap getter h base impl. + */ + private PcapGetterHBaseImpl() { + } + + /** + * Adds reverse keys to the list if the flag 'includeReverseTraffic' is set to + * true; removes duplicates and sorts the list by ascending order;. + * + * @param keys + * input keys + * @param includeReverseTraffic + * flag whether or not to include reverse traffic + * @return List + */ + @VisibleForTesting + List sortKeysByAscOrder(List keys, + boolean includeReverseTraffic) { + Assert.notEmpty(keys, "'keys' must not be null"); + if (includeReverseTraffic) { + keys.addAll(PcapHelper.reverseKey(keys)); + } + List deDupKeys = removeDuplicateKeys(keys); + Collections.sort(deDupKeys); + return deDupKeys; + } + + /** + * Removes the duplicate keys. + * + * @param keys + * the keys + * @return the list + */ + @VisibleForTesting +public + List removeDuplicateKeys(List keys) { + Set set = new HashSet(keys); + return new ArrayList(set); + } + + /** + *

+ * Returns the sublist starting from the element after the lastRowKey + * to the last element in the list; if the 'lastRowKey' is not matched + * the complete list will be returned. + *

+ * + *
+   * Eg :
+   *  keys = [18800006-1800000b-06-0019-caac, 18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   *  lastRowKey = "18800006-1800000b-06-0019-caac-65140-40815"
+   *  and the response from this method [18800006-1800000b-06-0050-5af6, 18800006-1800000b-11-0035-3810]
+   * 
+ * + * @param keys + * keys + * @param lastRowKey + * last row key of the previous partial response + * @return List + */ + @VisibleForTesting + List getUnprocessedSublistOfKeys(List keys, + String lastRowKey) { + Assert.notEmpty(keys, "'keys' must not be null"); + Assert.hasText(lastRowKey, "'lastRowKey' must not be null"); + String partialKey = getTokens(lastRowKey, 5); + int startIndex = 0; + for (int i = 0; i < keys.size(); i++) { + if (partialKey.equals(keys.get(i))) { + startIndex = i + 1; + break; + } + } + List unprocessedKeys = keys.subList(startIndex, keys.size()); + return unprocessedKeys; + } + + /** + * Returns the first 'noOfTokens' tokens from the given key; token delimiter + * "-";. + * + * @param key + * given key + * @param noOfTokens + * number of tokens to retrieve + * @return the tokens + */ + @VisibleForTesting + String getTokens(String key, int noOfTokens) { + String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER; + String regex = "\\" + delimeter; + String[] keyTokens = key.split(regex); + Assert.isTrue(noOfTokens < keyTokens.length, + "Invalid value for 'noOfTokens'"); + StringBuffer sbf = new StringBuffer(); + for (int i = 0; i < noOfTokens; i++) { + sbf.append(keyTokens[i]); + if (i != (noOfTokens - 1)) { + sbf.append(HBaseConfigConstants.PCAP_KEY_DELIMETER); + } + + } + return sbf.toString(); + } + + /** + * Process key. + * + * @param pcapsResponse + * the pcaps response + * @param key + * the key + * @param startTime + * the start time + * @param endTime + * the end time + * @param isPartialResponse + * the is partial response + * @param includeDuplicateLastRow + * the include duplicate last row + * @param maxResultSize + * the max result size + * @return the pcaps response + * @throws IOException + * Signals that an I/O exception has occurred. + */ + @VisibleForTesting + PcapsResponse processKey(PcapsResponse pcapsResponse, String key, + long startTime, long endTime, boolean isPartialResponse, + boolean includeDuplicateLastRow, long maxResultSize) throws IOException { + HTable table = null; + Scan scan = null; + List scannedCells = null; + try { + // 1. Create start and stop row for the key; + Map keysMap = createStartAndStopRowKeys(key, + isPartialResponse, includeDuplicateLastRow); + + // 2. if the input key contains all fragments (7) and it is not part + // of previous partial response (isPartialResponse), + // 'keysMap' will be null; do a Get; currently not doing any + // response size related checks for Get; + // by default all cells from a specific row are sorted by timestamp + if (keysMap == null) { + Get get = createGetRequest(key, startTime, endTime); + List cells = executeGetRequest(table, get); + for (Cell cell : cells) { + pcapsResponse.addPcaps(CellUtil.cloneValue(cell)); + } + return pcapsResponse; + } + // 3. Create and execute Scan request + scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime, + maxResultSize); + scannedCells = executeScanRequest(table, scan); + LOGGER.info("scannedCells size :" + scannedCells.size()); + addToResponse(pcapsResponse, scannedCells, maxResultSize); + + } catch (IOException e) { + LOGGER.error("Exception occurred while fetching Pcaps for the keys :" + + key, e); + if (e instanceof ZooKeeperConnectionException + || e instanceof MasterNotRunningException + || e instanceof NoServerForRegionException) { + int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit(); + System.out.println("maxRetryLimit =" + maxRetryLimit); + for (int attempt = 1; attempt <= maxRetryLimit; attempt++) { + System.out.println("attempting =" + attempt); + try { + HBaseConfigurationUtil.closeConnection(); // closing the + // existing + // connection + // and retry, + // it will + // create a new + // HConnection + scannedCells = executeScanRequest(table, scan); + addToResponse(pcapsResponse, scannedCells, maxResultSize); + break; + } catch (IOException ie) { + if (attempt == maxRetryLimit) { + LOGGER.error("Throwing the exception after retrying " + + maxRetryLimit + " times."); + throw e; + } + } + } + } + + } finally { + if (table != null) { + table.close(); + } + } + return pcapsResponse; + } + + /** + * Adds the to response. + * + * @param pcapsResponse + * the pcaps response + * @param scannedCells + * the scanned cells + * @param maxResultSize + * the max result size + */ + private void addToResponse(PcapsResponse pcapsResponse, + List scannedCells, long maxResultSize) { + String lastKeyFromCurrentScan = null; + if (scannedCells != null && scannedCells.size() > 0) { + lastKeyFromCurrentScan = new String(CellUtil.cloneRow(scannedCells + .get(scannedCells.size() - 1))); + } + // 4. calculate the response size + Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator()); + for (Cell sortedCell : scannedCells) { + pcapsResponse.addPcaps(CellUtil.cloneValue(sortedCell)); + } + if (!pcapsResponse.isResonseSizeWithinLimit(maxResultSize)) { + pcapsResponse.setStatus(PcapsResponse.Status.PARTIAL); // response size + // reached + pcapsResponse.setLastRowKey(new String(lastKeyFromCurrentScan)); + } + } + + /** + * Builds start and stop row keys according to the following logic : 1. + * Creates tokens out of 'key' using pcap_id delimiter ('-') 2. if the input + * 'key' contains (assume : configuredTokensInRowKey=7 and + * minimumTokensIninputKey=5): a). 5 tokens + * ("srcIp-dstIp-protocol-srcPort-dstPort") startKey = + * "srcIp-dstIp-protocol-srcPort-dstPort-00000-00000" stopKey = + * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" b). 6 tokens + * ("srcIp-dstIp-protocol-srcPort-dstPort-id1") startKey = + * "srcIp-dstIp-protocol-srcPort-dstPort-id1-00000" stopKey = + * "srcIp-dstIp-protocol-srcPort-dstPort-id1-99999" + * + * c). 7 tokens ("srcIp-dstIp-protocol-srcPort-dstPort-id1-id2") 1>. if the + * key is NOT part of the partial response from previous request, return + * 'null' 2>. if the key is part of partial response from previous request + * startKey = "srcIp-dstIp-protocol-srcPort-dstPort-id1-(id2+1)"; 1 is added + * to exclude this key as it was included in the previous request stopKey = + * "srcIp-dstIp-protocol-srcPort-dstPort-99999-99999" + * + * @param key + * the key + * @param isLastRowKey + * if the key is part of partial response + * @param includeDuplicateLastRow + * the include duplicate last row + * @return Map + */ + @VisibleForTesting + Map createStartAndStopRowKeys(String key, + boolean isLastRowKey, boolean includeDuplicateLastRow) { + String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER; + String regex = "\\" + delimeter; + String[] keyTokens = key.split(regex); + + String startKey = null; + String endKey = null; + Map map = new HashMap(); + + int configuredTokensInRowKey = ConfigurationUtil + .getConfiguredTokensInRowkey(); + int minimumTokensIninputKey = ConfigurationUtil + .getMinimumTokensInInputkey(); + Assert + .isTrue( + minimumTokensIninputKey <= configuredTokensInRowKey, + "tokens in the input key (separated by '-'), must be less than or equal to the tokens used in hbase table row key "); + // in case if the input key contains 'configuredTokensInRowKey' tokens and + // it is NOT a + // partial response key, do a Get instead of Scan + if (keyTokens.length == configuredTokensInRowKey) { + if (!isLastRowKey) { + return null; + } + // it is a partial response key; 'startKey' is same as input partial + // response key; 'endKey' can be built by replacing + // (configuredTokensInRowKey - minimumTokensIninputKey) tokens + // of input partial response key with '99999' + if (keyTokens.length == minimumTokensIninputKey) { + return null; + } + int appendingTokenSlots = configuredTokensInRowKey + - minimumTokensIninputKey; + if (appendingTokenSlots > 0) { + String partialKey = getTokens(key, minimumTokensIninputKey); + StringBuffer sbfStartNew = new StringBuffer(partialKey); + StringBuffer sbfEndNew = new StringBuffer(partialKey); + for (int i = 0; i < appendingTokenSlots; i++) { + if (i == (appendingTokenSlots - 1)) { + if (!includeDuplicateLastRow) { + sbfStartNew + .append(HBaseConfigConstants.PCAP_KEY_DELIMETER) + .append( + Integer.valueOf(keyTokens[minimumTokensIninputKey + i]) + 1); + } else { + sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER) + .append(keyTokens[minimumTokensIninputKey + i]); + } + } else { + sbfStartNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( + keyTokens[minimumTokensIninputKey + i]); + } + sbfEndNew.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( + getMaxLimitForAppendingTokens()); + } + startKey = sbfStartNew.toString(); + endKey = sbfEndNew.toString(); + } + } else { + StringBuffer sbfStart = new StringBuffer(key); + StringBuffer sbfEnd = new StringBuffer(key); + for (int i = keyTokens.length; i < configuredTokensInRowKey; i++) { + sbfStart.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( + getMinLimitForAppendingTokens()); + sbfEnd.append(HBaseConfigConstants.PCAP_KEY_DELIMETER).append( + getMaxLimitForAppendingTokens()); + } + startKey = sbfStart.toString(); + endKey = sbfEnd.toString(); + } + map.put(HBaseConfigConstants.START_KEY, startKey); + map.put(HBaseConfigConstants.END_KEY, endKey); + + return map; + } + + /** + * Returns false if keys is empty or null AND lastRowKey is null or + * empty; otherwise returns true;. + * + * @param keys + * input row keys + * @param lastRowKey + * partial response key + * @return boolean + */ + @VisibleForTesting + boolean checkIfValidInput(List keys, String lastRowKey) { + if (CollectionUtils.isEmpty(keys) + && StringUtils.isEmpty(lastRowKey)) { + return false; + } + return true; + } + + /** + * Executes the given Get request. + * + * @param table + * hbase table + * @param get + * Get + * @return List + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private List executeGetRequest(HTable table, Get get) + throws IOException { + LOGGER.info("Get :" + get.toString()); + table = (HTable) HBaseConfigurationUtil.getConnection().getTable( + ConfigurationUtil.getTableName()); + Result result = table.get(get); + List cells = result.getColumnCells( + ConfigurationUtil.getColumnFamily(), + ConfigurationUtil.getColumnQualifier()); + return cells; + } + + /** + * Execute scan request. + * + * @param table + * hbase table + * @param scan + * the scan + * @return the list + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private List executeScanRequest(HTable table, Scan scan) + throws IOException { + LOGGER.info("Scan :" + scan.toString()); + table = (HTable) HBaseConfigurationUtil.getConnection().getTable( + ConfigurationUtil.getConfiguration().getString("hbase.table.name")); + ResultScanner resultScanner = table.getScanner(scan); + List scannedCells = new ArrayList(); + for (Result result = resultScanner.next(); result != null; result = resultScanner + .next()) { + List cells = result.getColumnCells( + ConfigurationUtil.getColumnFamily(), + ConfigurationUtil.getColumnQualifier()); + if (cells != null) { + for (Cell cell : cells) { + scannedCells.add(cell); + } + } + } + return scannedCells; + } + + /** + * Creates the get request. + * + * @param key + * the key + * @param startTime + * the start time + * @param endTime + * the end time + * @return the gets the + * @throws IOException + * Signals that an I/O exception has occurred. + */ + @VisibleForTesting + Get createGetRequest(String key, long startTime, long endTime) + throws IOException { + Get get = new Get(Bytes.toBytes(key)); + // set family name + get.addFamily(ConfigurationUtil.getColumnFamily()); + + // set column family, qualifier + get.addColumn(ConfigurationUtil.getColumnFamily(), + ConfigurationUtil.getColumnQualifier()); + + // set max versions + get.setMaxVersions(ConfigurationUtil.getMaxVersions()); + + // set time range + setTimeRangeOnGet(get, startTime, endTime); + return get; + } + + /** + * Creates the scan request. + * + * @param pcapsResponse + * the pcaps response + * @param keysMap + * the keys map + * @param startTime + * the start time + * @param endTime + * the end time + * @param maxResultSize + * the max result size + * @return the scan + * @throws IOException + * Signals that an I/O exception has occurred. + */ + @VisibleForTesting + Scan createScanRequest(PcapsResponse pcapsResponse, + Map keysMap, long startTime, long endTime, + long maxResultSize) throws IOException { + Scan scan = new Scan(); + // set column family, qualifier + scan.addColumn(ConfigurationUtil.getColumnFamily(), + ConfigurationUtil.getColumnQualifier()); + + // set start and stop keys + scan.setStartRow(keysMap.get(HBaseConfigConstants.START_KEY).getBytes()); + scan.setStopRow(keysMap.get(HBaseConfigConstants.END_KEY).getBytes()); + + // set max results size : remaining size = max results size - ( current + // pcaps response size + possible maximum row size) + long remainingSize = maxResultSize + - (pcapsResponse.getResponseSize() + ConfigurationUtil.getMaxRowSize()); + + if (remainingSize > 0) { + scan.setMaxResultSize(remainingSize); + } + // set max versions + scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt( + "hbase.table.column.maxVersions")); + + // set time range + setTimeRangeOnScan(scan, startTime, endTime); + return scan; + } + + /** + * Sets the time range on scan. + * + * @param scan + * the scan + * @param startTime + * the start time + * @param endTime + * the end time + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private void setTimeRangeOnScan(Scan scan, long startTime, long endTime) + throws IOException { + boolean setTimeRange = true; + if (startTime < 0 && endTime < 0) { + setTimeRange = false; + } + if (setTimeRange) { + if (startTime < 0) { + startTime = 0; + } else { + startTime = PcapHelper.convertToDataCreationTimeUnit(startTime); + } + if (endTime < 0) { + endTime = Long.MAX_VALUE; + } else { + endTime = PcapHelper.convertToDataCreationTimeUnit(endTime); + } + Assert.isTrue(startTime < endTime, + "startTime value must be less than endTime value"); + scan.setTimeRange(startTime, endTime); + } + } + + /** + * Sets the time range on get. + * + * @param get + * the get + * @param startTime + * the start time + * @param endTime + * the end time + * @throws IOException + * Signals that an I/O exception has occurred. + */ + private void setTimeRangeOnGet(Get get, long startTime, long endTime) + throws IOException { + boolean setTimeRange = true; + if (startTime < 0 && endTime < 0) { + setTimeRange = false; + } + if (setTimeRange) { + if (startTime < 0) { + startTime = 0; + } else { + startTime = PcapHelper.convertToDataCreationTimeUnit(startTime); + } + if (endTime < 0) { + endTime = Long.MAX_VALUE; + } else { + endTime = PcapHelper.convertToDataCreationTimeUnit(endTime); + } + Assert.isTrue(startTime < endTime, + "startTime value must be less than endTime value"); + get.setTimeRange(startTime, endTime); + } + } + + /** + * Gets the min limit for appending tokens. + * + * @return the min limit for appending tokens + */ + private String getMinLimitForAppendingTokens() { + int digits = ConfigurationUtil.getAppendingTokenDigits(); + StringBuffer sbf = new StringBuffer(); + for (int i = 0; i < digits; i++) { + sbf.append("0"); + } + return sbf.toString(); + } + + /** + * Gets the max limit for appending tokens. + * + * @return the max limit for appending tokens + */ + private String getMaxLimitForAppendingTokens() { + int digits = ConfigurationUtil.getAppendingTokenDigits(); + StringBuffer sbf = new StringBuffer(); + for (int i = 0; i < digits; i++) { + sbf.append("9"); + } + return sbf.toString(); + } + + /** + * The main method. + * + * @param args + * the arguments + * + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static void main(String[] args) throws IOException { + if (args == null || args.length < 2) { + usage(); + return; + } + String outputFileName = null; + outputFileName = args[1]; + List keys = Arrays.asList(StringUtils.split(args[2], ",")); + System.out.println("Geting keys " + keys); + long startTime = 0; + long endTime = Long.MAX_VALUE; + if (args.length > 3) { + startTime = Long.valueOf(args[3]); + } + if (args.length > 4) { + endTime = Long.valueOf(args[4]); + } + System.out.println("With start time " + startTime + " and end time " + + endTime); + PcapGetterHBaseImpl downloader = new PcapGetterHBaseImpl(); + PcapsResponse pcaps = downloader.getPcaps(keys, null, startTime, endTime, + false, false, 6); + File file = new File(outputFileName); + FileUtils.write(file, "", false); + FileUtils.writeByteArrayToFile(file, pcaps.getPcaps(), true); + } + + /** + * Usage. + */ + private static void usage() { + System.out.println("java " + PcapGetterHBaseImpl.class.getName() // $codepro.audit.disable + // debuggingCode + + " [stop key]"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapHelper.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapHelper.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapHelper.java new file mode 100644 index 0000000..6018065 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapHelper.java @@ -0,0 +1,205 @@ +package com.apache.metron.pcapservice; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; +import org.springframework.util.Assert; + +import com.google.common.annotations.VisibleForTesting; + +/** + * utility class which holds methods related to time conversions, building + * reverse keys. + */ +public class PcapHelper { + + /** The Constant LOGGER. */ + private static final Logger LOGGER = Logger.getLogger(PcapHelper.class); + + /** The cell timestamp comparator. */ + private static CellTimestampComparator CELL_TIMESTAMP_COMPARATOR = new CellTimestampComparator(); + + /** + * The Enum TimeUnit. + */ + public enum TimeUnit { + + /** The seconds. */ + SECONDS, + /** The millis. */ + MILLIS, + /** The micros. */ + MICROS, + /** The unknown. */ + UNKNOWN + }; + + /** + * Converts the given time to the 'hbase' data creation time unit. + * + * @param inputTime + * the input time + * @return the long + */ + public static long convertToDataCreationTimeUnit(long inputTime) { + if (inputTime <= 9999999999L) { + return convertSecondsToDataCreationTimeUnit(inputTime); // input time unit + // is in seconds + } else if (inputTime <= 9999999999999L) { + return convertMillisToDataCreationTimeUnit(inputTime); // input time unit + // is in millis + } else if (inputTime <= 9999999999999999L) { + return convertMicrosToDataCreationTimeUnit(inputTime); // input time unit + // it in micros + } + return inputTime; // input time unit is unknown + } + + /** + * Returns the 'hbase' data creation time unit by reading + * 'hbase.table.data.time.unit' property in 'hbase-config' properties file; If + * none is mentioned in properties file, returns TimeUnit.UNKNOWN + * + * @return TimeUnit + */ + @VisibleForTesting + public static TimeUnit getDataCreationTimeUnit() { + String timeUnit = ConfigurationUtil.getConfiguration().getString( + "hbase.table.data.time.unit"); + LOGGER.debug("hbase.table.data.time.unit=" + timeUnit.toString()); + if (StringUtils.isNotEmpty(timeUnit)) { + return TimeUnit.valueOf(timeUnit); + } + return TimeUnit.UNKNOWN; + } + + /** + * Convert seconds to data creation time unit. + * + * @param inputTime + * the input time + * @return the long + */ + @VisibleForTesting + public static long convertSecondsToDataCreationTimeUnit(long inputTime) { + System.out.println("convert Seconds To DataCreation TimeUnit"); + TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit(); + if (TimeUnit.SECONDS == dataCreationTimeUnit) { + return inputTime; + } else if (TimeUnit.MILLIS == dataCreationTimeUnit) { + return inputTime * 1000; + } else if (TimeUnit.MICROS == dataCreationTimeUnit) { + return inputTime * 1000 * 1000; + } + return inputTime; + } + + /** + * Builds the reverseKey to fetch the pcaps in the reverse traffic + * (destination to source). + * + * @param key + * indicates hbase rowKey (partial or full) in the format + * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment" + * @return String indicates the key in the format + * "dstAddr-srcAddr-protocol-dstPort-srcPort" + */ + public static String reverseKey(String key) { + Assert.hasText(key, "key must not be null or empty"); + String delimeter = HBaseConfigConstants.PCAP_KEY_DELIMETER; + String regex = "\\" + delimeter; + StringBuffer sb = new StringBuffer(); + try { + String[] tokens = key.split(regex); + Assert + .isTrue( + (tokens.length == 5 || tokens.length == 6 || tokens.length == 7), + "key is not in the format : 'srcAddr-dstAddr-protocol-srcPort-dstPort-{ipId-fragment identifier}'"); + sb.append(tokens[1]).append(delimeter).append(tokens[0]) + .append(delimeter).append(tokens[2]).append(delimeter) + .append(tokens[4]).append(delimeter).append(tokens[3]); + } catch (Exception e) { + Log.warn("Failed to reverse the key. Reverse scan won't be performed.", e); + } + return sb.toString(); + } + + /** + * Builds the reverseKeys to fetch the pcaps in the reverse traffic + * (destination to source). If all keys in the input are not in the expected + * format, it returns an empty list; + * + * @param keys + * indicates list of hbase rowKeys (partial or full) in the format + * "srcAddr-dstAddr-protocol-srcPort-dstPort-fragment" + * @return List indicates the list of keys in the format + * "dstAddr-srcAddr-protocol-dstPort-srcPort" + */ + public static List reverseKey(List keys) { + Assert.notEmpty(keys, "'keys' must not be null or empty"); + List reverseKeys = new ArrayList(); + for (String key : keys) { + if (key != null) { + String reverseKey = reverseKey(key); + if (StringUtils.isNotEmpty(reverseKey)) { + reverseKeys.add(reverseKey); + } + } + } + return reverseKeys; + } + + /** + * Returns Comparator for sorting pcaps cells based on the timestamp (dsc). + * + * @return CellTimestampComparator + */ + public static CellTimestampComparator getCellTimestampComparator() { + return CELL_TIMESTAMP_COMPARATOR; + } + + /** + * Convert millis to data creation time unit. + * + * @param inputTime + * the input time + * @return the long + */ + @VisibleForTesting + private static long convertMillisToDataCreationTimeUnit(long inputTime) { + System.out.println("convert Millis To DataCreation TimeUnit"); + TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit(); + if (TimeUnit.SECONDS == dataCreationTimeUnit) { + return (inputTime / 1000); + } else if (TimeUnit.MILLIS == dataCreationTimeUnit) { + return inputTime; + } else if (TimeUnit.MICROS == dataCreationTimeUnit) { + return inputTime * 1000; + } + return inputTime; + } + + /** + * Convert micros to data creation time unit. + * + * @param inputTime + * the input time + * @return the long + */ + @VisibleForTesting + private static long convertMicrosToDataCreationTimeUnit(long inputTime) { + System.out.println("convert Micros To DataCreation TimeUnit"); + TimeUnit dataCreationTimeUnit = getDataCreationTimeUnit(); + if (TimeUnit.SECONDS == dataCreationTimeUnit) { + return inputTime / (1000 * 1000); + } else if (TimeUnit.MILLIS == dataCreationTimeUnit) { + return inputTime / 1000; + } else if (TimeUnit.MICROS == dataCreationTimeUnit) { + return inputTime; + } + return inputTime; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java new file mode 100644 index 0000000..6d33824 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java @@ -0,0 +1,256 @@ +package com.apache.metron.pcapservice; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; + +import com.google.common.annotations.VisibleForTesting; +import com.apache.metron.dataservices.auth.AuthTokenFilter; +import com.apache.metron.pcap.PcapUtils; + +@Path("/pcap/") +public class PcapReceiverImplRestEasy { + + /** The Constant LOGGER. */ + private static final Logger LOGGER = Logger + .getLogger(PcapReceiverImplRestEasy.class); + + /** The Constant HEADER_CONTENT_DISPOSITION_NAME. */ + private static final String HEADER_CONTENT_DISPOSITION_NAME = "Content-Disposition"; + + /** The Constant HEADER_CONTENT_DISPOSITION_VALUE. */ + private static final String HEADER_CONTENT_DISPOSITION_VALUE = "attachment; filename=\"managed-threat.pcap\""; + + /** partial response key header name. */ + private static final String HEADER_PARTIAL_RESPONE_KEY = "lastRowKey"; + + @AuthTokenFilter + @GET + @Path("/pcapGetter/getPcapsByKeys") + public Response getPcapsByKeys( + @QueryParam("keys") List keys, + @QueryParam("lastRowKey") String lastRowKey, + @DefaultValue("-1") @QueryParam("startTime") long startTime, + @DefaultValue("-1") @QueryParam("endTime") long endTime, + @QueryParam("includeDuplicateLastRow") boolean includeDuplicateLastRow, + @QueryParam("includeReverseTraffic") boolean includeReverseTraffic, + @QueryParam("maxResponseSize") String maxResponseSize, + @Context HttpServletResponse response) throws IOException { + PcapsResponse pcapResponse = null; + + LOGGER.debug( "/pcapGetter/getPcapsByKeys"); + + if (keys == null || keys.size() == 0) { + LOGGER.debug( "no keys provided" ); + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'keys' must not be null or empty").build(); + } + + try { + IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance(); + pcapResponse = pcapGetter.getPcaps(parseKeys(keys), lastRowKey, + startTime, endTime, includeReverseTraffic, + includeDuplicateLastRow, + ConfigurationUtil.validateMaxResultSize(maxResponseSize)); + LOGGER.info("pcaps response in REST layer =" + + pcapResponse.toString()); + + // return http status '204 No Content' if the pcaps response size is + // 0 + if (pcapResponse == null || pcapResponse.getResponseSize() == 0) { + + return Response.status(Response.Status.NO_CONTENT).build(); + } + + // return http status '206 Partial Content', the partial response + // file and + // 'lastRowKey' header , if the pcaps response status is 'PARTIAL' + + response.setHeader(HEADER_CONTENT_DISPOSITION_NAME, + HEADER_CONTENT_DISPOSITION_VALUE); + + if (pcapResponse.getStatus() == PcapsResponse.Status.PARTIAL) { + + response.setHeader(HEADER_PARTIAL_RESPONE_KEY, + pcapResponse.getLastRowKey()); + + return Response + .ok(pcapResponse.getPcaps(), + MediaType.APPLICATION_OCTET_STREAM).status(206) + .build(); + + } + + } catch (IOException e) { + LOGGER.error( + "Exception occurred while fetching Pcaps for the keys :" + + keys.toString(), e); + throw e; + } + + // return http status '200 OK' along with the complete pcaps response + // file, + // and headers + // return new ResponseEntity(pcapResponse.getPcaps(), headers, + // HttpStatus.OK); + + return Response + .ok(pcapResponse.getPcaps(), MediaType.APPLICATION_OCTET_STREAM) + .status(200).build(); + + } + + @AuthTokenFilter + @GET + @Path("/pcapGetter/getPcapsByKeyRange") + + public Response getPcapsByKeyRange( + @QueryParam("startKey") String startKey, + @QueryParam("endKey")String endKey, + @QueryParam("maxResponseSize") String maxResponseSize, + @DefaultValue("-1") @QueryParam("startTime")long startTime, + @DefaultValue("-1") @QueryParam("endTime") long endTime, + @Context HttpServletResponse servlet_response) throws IOException { + + if (startKey == null || startKey.isEmpty()) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'start key' must not be null or empty").build(); + + if (endKey == null || endKey.isEmpty()) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'end key' must not be null or empty").build(); + + + byte[] response = null; + try { + IPcapScanner pcapScanner = PcapScannerHBaseImpl.getInstance(); + response = pcapScanner.getPcaps(startKey, endKey, + ConfigurationUtil.validateMaxResultSize(maxResponseSize), startTime, + endTime); + if (response == null || response.length == 0) { + + return Response.status(Response.Status.NO_CONTENT).entity("No Data").build(); + + } + servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME, + HEADER_CONTENT_DISPOSITION_VALUE); + + } catch (IOException e) { + LOGGER.error( + "Exception occurred while fetching Pcaps for the key range : startKey=" + + startKey + ", endKey=" + endKey, e); + throw e; + } + // return http status '200 OK' along with the complete pcaps response file, + // and headers + + return Response + .ok(response, MediaType.APPLICATION_OCTET_STREAM) + .status(200).build(); + } + + /* + * (non-Javadoc) + * + * @see + * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang + * .String, java.lang.String, java.lang.String, java.lang.String, + * java.lang.String, long, long, boolean, + * javax.servlet.http.HttpServletResponse) + */ + @AuthTokenFilter + @GET + @Path("/pcapGetter/getPcapsByIdentifiers") + + public Response getPcapsByIdentifiers( + @QueryParam ("srcIp") String srcIp, + @QueryParam ("dstIp") String dstIp, + @QueryParam ("protocol") String protocol, + @QueryParam ("srcPort") String srcPort, + @QueryParam ("dstPort") String dstPort, + @DefaultValue("-1") @QueryParam ("startTime")long startTime, + @DefaultValue("-1") @QueryParam ("endTime")long endTime, + @DefaultValue("false") @QueryParam ("includeReverseTraffic") boolean includeReverseTraffic, + @Context HttpServletResponse servlet_response) + + throws IOException { + + if (srcIp == null || srcIp.equals("")) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'srcIp' must not be null or empty").build(); + + if (dstIp == null || dstIp.equals("")) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'dstIp' must not be null or empty").build(); + + if (protocol == null || protocol.equals("")) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'protocol' must not be null or empty").build(); + + if (srcPort == null || srcPort.equals("")) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'srcPort' must not be null or empty").build(); + + if (dstPort == null || dstPort.equals("")) + return Response.serverError().status(Response.Status.NO_CONTENT) + .entity("'dstPort' must not be null or empty").build(); + + + PcapsResponse response = null; + try { + String sessionKey = PcapUtils.getSessionKey(srcIp, dstIp, protocol, + srcPort, dstPort); + LOGGER.info("sessionKey =" + sessionKey); + IPcapGetter pcapGetter = PcapGetterHBaseImpl.getInstance(); + response = pcapGetter.getPcaps(Arrays.asList(sessionKey), null, + startTime, endTime, includeReverseTraffic, false, + ConfigurationUtil.getDefaultResultSize()); + if (response == null || response.getResponseSize() == 0) { + return Response.status(Response.Status.NO_CONTENT).build(); + } + servlet_response.setHeader(HEADER_CONTENT_DISPOSITION_NAME, + HEADER_CONTENT_DISPOSITION_VALUE); + + } catch (IOException e) { + LOGGER.error("Exception occurred while fetching Pcaps by identifiers :", + e); + throw e; + } + // return http status '200 OK' along with the complete pcaps response file, + // and headers + return Response + .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM) + .status(200).build(); + } + /** + * This method parses the each value in the List using delimiter ',' and + * builds a new List;. + * + * @param keys + * list of keys to be parsed + * @return list of keys + */ + @VisibleForTesting + List parseKeys(List keys) { + // Assert.notEmpty(keys); + List parsedKeys = new ArrayList(); + for (String key : keys) { + parsedKeys.addAll(Arrays.asList(StringUtils.split( + StringUtils.trim(key), ","))); + } + return parsedKeys; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java new file mode 100644 index 0000000..e32b80e --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapScannerHBaseImpl.java @@ -0,0 +1,302 @@ +package com.apache.metron.pcapservice; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.NoServerForRegionException; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Logger; +import org.springframework.util.Assert; + +import com.google.common.annotations.VisibleForTesting; +import com.apache.metron.pcap.PcapMerger; + +/** + * Singleton class which integrates with HBase table and returns sorted pcaps + * based on the timestamp for the given range of keys. Creates HConnection if it + * is not already created and the same connection instance is being used for all + * requests + * + * @author sheetal + * @version $Revision: 1.0 $ + */ +public class PcapScannerHBaseImpl implements IPcapScanner { + + /** The Constant LOGGER. */ + private static final Logger LOGGER = Logger + .getLogger(PcapScannerHBaseImpl.class); + + /** The Constant DEFAULT_HCONNECTION_RETRY_LIMIT. */ + private static final int DEFAULT_HCONNECTION_RETRY_LIMIT = 0; + + /** The pcap scanner h base. */ + private static IPcapScanner pcapScannerHBase = null; + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String, + * java.lang.String, long, long, long) + */ + + public byte[] getPcaps(String startKey, String endKey, long maxResultSize, + long startTime, long endTime) throws IOException { + Assert.hasText(startKey, "startKey must no be null or empty"); + byte[] cf = Bytes.toBytes(ConfigurationUtil.getConfiguration() + .getString("hbase.table.column.family")); + byte[] cq = Bytes.toBytes(ConfigurationUtil.getConfiguration() + .getString("hbase.table.column.qualifier")); + // create scan request + Scan scan = createScanRequest(cf, cq, startKey, endKey, maxResultSize, + startTime, endTime); + List pcaps = new ArrayList(); + HTable table = null; + try { + pcaps = scanPcaps(pcaps, table, scan, cf, cq); + } catch (IOException e) { + LOGGER.error( + "Exception occurred while fetching Pcaps for the key range : startKey=" + + startKey + ", endKey=" + endKey, e); + if (e instanceof ZooKeeperConnectionException + || e instanceof MasterNotRunningException + || e instanceof NoServerForRegionException) { + int maxRetryLimit = getConnectionRetryLimit(); + for (int attempt = 1; attempt <= maxRetryLimit; attempt++) { + try { + HBaseConfigurationUtil.closeConnection(); // closing the existing + // connection and retry, + // it will create a new + // HConnection + pcaps = scanPcaps(pcaps, table, scan, cf, cq); + break; + } catch (IOException ie) { + if (attempt == maxRetryLimit) { + System.out.println("Throwing the exception after retrying " + + maxRetryLimit + " times."); + throw e; + } + } + } + } else { + throw e; + } + } finally { + if (table != null) { + table.close(); + } + } + if (pcaps.size() == 1) { + return pcaps.get(0); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, pcaps); + byte[] response = baos.toByteArray(); + return response; + } + + /** + * Creates the scan request. + * + * @param cf + * the cf + * @param cq + * the cq + * @param startKey + * the start key + * @param endKey + * the end key + * @param maxResultSize + * the max result size + * @param startTime + * the start time + * @param endTime + * the end time + * @return the scan + * @throws IOException + * Signals that an I/O exception has occurred. + */ + @VisibleForTesting + Scan createScanRequest(byte[] cf, byte[] cq, String startKey, String endKey, + long maxResultSize, long startTime, long endTime) throws IOException { + Scan scan = new Scan(); + scan.addColumn(cf, cq); + scan.setMaxVersions(ConfigurationUtil.getConfiguration().getInt( + "hbase.table.column.maxVersions")); + scan.setStartRow(startKey.getBytes()); + if (endKey != null) { + scan.setStopRow(endKey.getBytes()); + } + scan.setMaxResultSize(maxResultSize); + boolean setTimeRange = true; + if (startTime < 0 && endTime < 0) { + setTimeRange = false; + } + if (setTimeRange) { + if (startTime < 0) { + startTime = 0; + } else { + startTime = PcapHelper.convertToDataCreationTimeUnit(startTime); + } + if (endTime < 0) { + endTime = Long.MAX_VALUE; + } else { + endTime = PcapHelper.convertToDataCreationTimeUnit(endTime); + } + Assert.isTrue(startTime < endTime, + "startTime value must be less than endTime value"); + } + // create Scan request; + if (setTimeRange) { + scan.setTimeRange(startTime, endTime); + } + return scan; + } + + /** + * Scan pcaps. + * + * @param pcaps + * the pcaps + * @param table + * the table + * @param scan + * the scan + * @param cf + * the cf + * @param cq + * the cq + * @return the list + * @throws IOException + * Signals that an I/O exception has occurred. + */ + @VisibleForTesting + List scanPcaps(List pcaps, HTable table, Scan scan, + byte[] cf, byte[] cq) throws IOException { + LOGGER.info("Scan =" + scan.toString()); + table = (HTable) HBaseConfigurationUtil.getConnection().getTable( + ConfigurationUtil.getConfiguration().getString("hbase.table.name")); + ResultScanner resultScanner = table.getScanner(scan); + List scannedCells = new ArrayList(); + for (Result result = resultScanner.next(); result != null; result = resultScanner + .next()) { + List cells = result.getColumnCells(cf, cq); + if (cells != null) { + for (Cell cell : cells) { + scannedCells.add(cell); + } + } + } + Collections.sort(scannedCells, PcapHelper.getCellTimestampComparator()); + LOGGER.info("sorted cells :" + scannedCells.toString()); + for (Cell sortedCell : scannedCells) { + pcaps.add(CellUtil.cloneValue(sortedCell)); + } + return pcaps; + } + + /** + * Gets the connection retry limit. + * + * @return the connection retry limit + */ + private int getConnectionRetryLimit() { + return ConfigurationUtil.getConfiguration().getInt( + "hbase.hconnection.retries.number", DEFAULT_HCONNECTION_RETRY_LIMIT); + } + + /* + * (non-Javadoc) + * + * @see com.cisco.opensoc.hbase.client.IPcapScanner#getPcaps(java.lang.String, + * java.lang.String) + */ + + public byte[] getPcaps(String startKey, String endKey) throws IOException { + Assert.hasText(startKey, "startKey must no be null or empty"); + Assert.hasText(endKey, "endKey must no be null or empty"); + return getPcaps(startKey, endKey, ConfigurationUtil.getDefaultResultSize(), + -1, -1); + } + + /** + * Always returns the singleton instance. + * + * @return IPcapScanner singleton instance + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public static IPcapScanner getInstance() throws IOException { + if (pcapScannerHBase == null) { + synchronized (PcapScannerHBaseImpl.class) { + if (pcapScannerHBase == null) { + pcapScannerHBase = new PcapScannerHBaseImpl(); + } + } + } + return pcapScannerHBase; + } + + /** + * Instantiates a new pcap scanner h base impl. + */ + private PcapScannerHBaseImpl() { + } + + /** + * The main method. + */ + // public static void main(String[] args) throws IOException { + // if (args == null || args.length < 3) { + // usage(); + // return; + // } + // String outputFileName = null; + // String startKey = null; + // String stopKey = null; + // outputFileName = args[0]; + // startKey = args[1]; + // if (args.length > 2) { // NOPMD by sheetal on 1/29/14 3:55 PM + // stopKey = args[2]; + // } + // PcapScannerHBaseImpl downloader = new PcapScannerHBaseImpl(); + // byte[] pcaps = downloader.getPcaps(startKey, stopKey, defaultResultSize, 0, + // Long.MAX_VALUE); + // File file = new File(outputFileName); + // FileUtils.write(file, "", false); + // ByteArrayOutputStream baos = new ByteArrayOutputStream(); // + // $codepro.audit.disable + // // closeWhereCreated + // PcapMerger.merge(baos, pcaps); + // FileUtils.writeByteArrayToFile(file, baos.toByteArray(), true); + // } + + /** + * Usage. + */ + @SuppressWarnings("unused") + private static void usage() { + System.out.println("java " + PcapScannerHBaseImpl.class.getName() // NOPMD + // by + // sheetal + // + // on + // 1/29/14 + // 3:55 + // PM + + " [stop key]"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java new file mode 100644 index 0000000..ea004e9 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/PcapsResponse.java @@ -0,0 +1,153 @@ +/** + * + */ +package com.apache.metron.pcapservice; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.apache.metron.pcap.PcapMerger; + + + +/** + * Holds pcaps data, status and the partial response key. + * + * @author Sayi + */ +public class PcapsResponse { + + /** + * The Enum Status. + */ + public enum Status { + + /** The partial. */ + PARTIAL, + /** The complete. */ + COMPLETE + }; + + /** response of the processed keys. */ + private List pcaps = new ArrayList();; + + /** partial response key. */ + private String lastRowKey; + + /** The status. */ + private Status status = Status.COMPLETE; + + /** + * Sets the pcaps. + * + * @param pcaps + * the new pcaps + */ + public void setPcaps(List pcaps) { + this.pcaps = pcaps; + } + + /** + * Adds the pcaps. + * + * @param pcaps + * the pcaps + */ + public void addPcaps(byte[] pcaps) { + this.pcaps.add(pcaps); + } + + /** + * Gets the partial response key. + * + * @return the partial response key + */ + public String getLastRowKey() { + return lastRowKey; + } + + /** + * Sets the partial response key. + * + * @param lastRowKey + * the last row key + */ + public void setLastRowKey(String lastRowKey) { + this.lastRowKey = lastRowKey; + } + + /** + * Gets the status. + * + * @return the status + */ + public Status getStatus() { + return status; + } + + /** + * Sets the status. + * + * @param status + * the new status + */ + public void setStatus(Status status) { + this.status = status; + } + + /** + * Checks if is resonse size within limit. + * + * @param maxResultSize + * the max result size + * @return true, if is resonse size within limit + */ + public boolean isResonseSizeWithinLimit(long maxResultSize) { + // System.out.println("isResonseSizeWithinLimit() : getResponseSize() < (input|default result size - maximum packet size ) ="+ + // getResponseSize()+ " < " + ( maxResultSize + // -ConfigurationUtil.getMaxRowSize())); + return getResponseSize() < (maxResultSize - ConfigurationUtil + .getMaxRowSize()); + } + + /** + * Gets the response size. + * + * @return the response size + */ + public long getResponseSize() { + long responseSize = 0; + for (byte[] pcap : this.pcaps) { + responseSize = responseSize + pcap.length; + } + return responseSize; + } + + /** + * Gets the pcaps. + * + * @return the pcaps + * @throws IOException + * Signals that an I/O exception has occurred. + */ + public byte[] getPcaps() throws IOException { + if (pcaps.size() == 1) { + return pcaps.get(0); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PcapMerger.merge(baos, pcaps); + return baos.toByteArray(); + } + + /* (non-Javadoc) + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return "PcapsResponse [lastRowKey=" + lastRowKey + + ", status=" + status + ", pcapsSize=" + + String.valueOf(getResponseSize()) + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java new file mode 100644 index 0000000..baf6285 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/RestTestingUtil.java @@ -0,0 +1,238 @@ +package com.apache.metron.pcapservice; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestTemplate; + +/** + * The Class RestTestingUtil. + */ +public class RestTestingUtil { + + /** The host name. */ + public static String hostName = null; + + /** + * Gets the pcaps by keys. + * + * @param keys + * the keys + * @return the pcaps by keys + */ + @SuppressWarnings("unchecked") + private static void getPcapsByKeys(String keys) { + System.out + .println("**********************getPcapsByKeys ******************************************************************************************"); + // 1. + String url = "http://" + hostName + + "/cisco-rest/pcapGetter/getPcapsByKeys?keys={keys}" + + "&includeReverseTraffic={includeReverseTraffic}" + + "&startTime={startTime}" + "&endTime={endTime}" + + "&maxResponseSize={maxResponseSize}"; + // default values + String startTime = "-1"; + String endTime = "-1"; + String maxResponseSize = "6"; + String includeReverseTraffic = "false"; + + @SuppressWarnings("rawtypes") + Map map = new HashMap(); + map.put("keys", keys); + map.put("includeReverseTraffic", includeReverseTraffic); + map.put("startTime", startTime); + map.put("endTime", endTime); + map.put("maxResponseSize", maxResponseSize); + + RestTemplate template = new RestTemplate(); + + // set headers and entity to send + HttpHeaders headers = new HttpHeaders(); + headers.set("Accept", MediaType.APPLICATION_OCTET_STREAM_VALUE); + HttpEntity requestEntity = new HttpEntity(headers); + + // 1. + ResponseEntity response1 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeys : request= \n response= %s \n", + keys, includeReverseTraffic, startTime, endTime, maxResponseSize, + response1); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + // 2. with reverse traffic + includeReverseTraffic = "true"; + map.put("includeReverseTraffic", includeReverseTraffic); + ResponseEntity response2 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeys : request= \n response= %s \n", + keys, includeReverseTraffic, startTime, endTime, maxResponseSize, + response2); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + // 3.with time range + startTime = System.getProperty("startTime", "-1"); + endTime = System.getProperty("endTime", "-1"); + map.put("startTime", startTime); + map.put("endTime", endTime); + ResponseEntity response3 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeys : request= \n response= %s \n", + keys, includeReverseTraffic, startTime, endTime, maxResponseSize, + response3); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + // 4.with maxResponseSize + maxResponseSize = System.getProperty("maxResponseSize", "6"); + map.put("maxResponseSize", maxResponseSize); + ResponseEntity response4 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeys : request= \n response= %s \n", + keys, includeReverseTraffic, startTime, endTime, maxResponseSize, + response4); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + } + + /** + * Gets the pcaps by keys range. + * + * @param startKey + * the start key + * @param endKey + * the end key + * @return the pcaps by keys range + */ + @SuppressWarnings("unchecked") + private static void getPcapsByKeysRange(String startKey, String endKey) { + System.out + .println("**********************getPcapsByKeysRange ******************************************************************************************"); + // 1. + String url = "http://" + hostName + + "/cisco-rest/pcapGetter/getPcapsByKeyRange?startKey={startKey}" + + "&endKey={endKey}" + "&startTime={startTime}" + "&endTime={endTime}" + + "&maxResponseSize={maxResponseSize}"; + // default values + String startTime = "-1"; + String endTime = "-1"; + String maxResponseSize = "6"; + @SuppressWarnings("rawtypes") + Map map = new HashMap(); + map.put("startKey", startKey); + map.put("endKey", "endKey"); + map.put("startTime", startTime); + map.put("endTime", endTime); + map.put("maxResponseSize", maxResponseSize); + + RestTemplate template = new RestTemplate(); + + // set headers and entity to send + HttpHeaders headers = new HttpHeaders(); + headers.set("Accept", MediaType.APPLICATION_OCTET_STREAM_VALUE); + HttpEntity requestEntity = new HttpEntity(headers); + + // 1. + ResponseEntity response1 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeysRange : request= \n response= %s \n", + startKey, endKey, startTime, endTime, maxResponseSize, response1); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + // 2. with time range + startTime = System.getProperty("startTime", "-1"); + endTime = System.getProperty("endTime", "-1"); + map.put("startTime", startTime); + map.put("endTime", endTime); + ResponseEntity response2 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeysRange : request= \n response= %s \n", + startKey, endKey, startTime, endTime, maxResponseSize, response2); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + // 3. with maxResponseSize + maxResponseSize = System.getProperty("maxResponseSize", "6"); + map.put("maxResponseSize", maxResponseSize); + ResponseEntity response3 = template.exchange(url, HttpMethod.GET, + requestEntity, byte[].class, map); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out + .format( + "getPcapsByKeysRange : request= \n response= %s \n", + startKey, endKey, startTime, endTime, maxResponseSize, response3); + System.out + .println("----------------------------------------------------------------------------------------------------"); + System.out.println(); + + } + + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { + + /* + * Run this program with system properties + * + * -DhostName=mon.hw.com:8090 + * -Dkeys=18800006-1800000b-06-0019-b39d,18800006- + * 1800000b-06-0050-5af6-64840-40785 + * -DstartKey=18000002-18800002-06-0436-0019-2440-34545 + * -DendKey=18000002-18800002-06-b773-0019-2840-34585 + */ + + hostName = System.getProperty("hostName"); + + String keys = System.getProperty("keys"); + + String statyKey = System.getProperty("startKey"); + String endKey = System.getProperty("endKey"); + + getPcapsByKeys(keys); + getPcapsByKeysRange(statyKey, endKey); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java new file mode 100644 index 0000000..83709ea --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/JettyServiceRunner.java @@ -0,0 +1,26 @@ +package com.apache.metron.pcapservice.rest; + +import java.util.HashSet; +import java.util.Set; + +import javax.ws.rs.core.Application; + +import com.apache.metron.pcapservice.PcapReceiverImplRestEasy; + +public class JettyServiceRunner extends Application { + + + private static Set services = new HashSet(); + + public JettyServiceRunner() { + // initialize restful services + services.add(new PcapReceiverImplRestEasy()); + } + @Override + public Set getSingletons() { + return services; + } + public static Set getServices() { + return services; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java new file mode 100644 index 0000000..c8f4a20 --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/pcapservice/rest/PcapService.java @@ -0,0 +1,34 @@ +package com.apache.metron.pcapservice.rest; + +import java.io.IOException; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher; + +import com.apache.metron.helpers.services.PcapServiceCli; + + +public class PcapService { + + public static void main(String[] args) throws IOException { + + PcapServiceCli cli = new PcapServiceCli(args); + cli.parse(); + + Server server = new Server(cli.getPort()); + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + ServletHolder h = new ServletHolder(new HttpServletDispatcher()); + h.setInitParameter("javax.ws.rs.Application", "com.apache.metron.pcapservice.rest.JettyServiceRunner"); + context.addServlet(h, "/*"); + server.setHandler(context); + try { + server.start(); + server.join(); + } catch (Exception e) { + e.printStackTrace(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3854e075/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/services/alerts/ElasticSearch_KafkaAlertsService.java ---------------------------------------------------------------------- diff --git a/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/services/alerts/ElasticSearch_KafkaAlertsService.java b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/services/alerts/ElasticSearch_KafkaAlertsService.java new file mode 100644 index 0000000..ee2cf2d --- /dev/null +++ b/metron-streaming/Metron-DataServices/src/main/java/org/apache/metron/services/alerts/ElasticSearch_KafkaAlertsService.java @@ -0,0 +1,88 @@ +package com.apache.metron.services.alerts; + +import javax.inject.Singleton; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.apache.metron.dataservices.common.MetronService; + +@Singleton +public class ElasticSearch_KafkaAlertsService implements MetronService { + + private static final Logger logger = LoggerFactory.getLogger( ElasticSearch_KafkaAlertsService.class ); + + @Override + public String identify() { + // TODO Auto-generated method stub + return "Elastic Search to Kafka Alerts Service"; + } + + @Override + public boolean init(String topicname) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean login() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean registerRulesFromFile() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean registerRules() { + // TODO Auto-generated method stub + return false; + } + + @Override + public String viewRules() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean editRules() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean deleteRules() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean registerForAlertsTopic(String topicname) { + // TODO Auto-generated method stub + return false; + } + + @Override + public String receiveAlertAll() { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean disconnectFromAlertsTopic(String topicname) { + // TODO Auto-generated method stub + return false; + } + + @Override + public String receiveAlertLast() { + // TODO Auto-generated method stub + return null; + } + + +}