sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fg...@apache.org
Subject [02/20] incubator-sdap-nexus git commit: Merge branch 'master' into s3
Date Wed, 17 Jan 2018 19:08:40 GMT
Merge branch 'master' into s3

Project: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/commit/be88efc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/tree/be88efc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/diff/be88efc5

Branch: refs/heads/master
Commit: be88efc5aa8ce44c7dff9c27aef93e9d49a2b153
Parents: 0bbf01c
Author: fgreg <fgreguska+github@gmail.com>
Authored: Wed Jan 10 14:08:59 2018 -0800
Committer: Frank Greguska <francis.greguska@jpl.nasa.gov>
Committed: Wed Jan 10 16:00:15 2018 -0800

----------------------------------------------------------------------
 data-access/nexustiles/config/datastores.ini    |  13 +-
 data-access/nexustiles/dao/CassandraProxy.pyx   |   4 +
 data-access/nexustiles/dao/DynamoProxy.pyx      | 135 ++++++++++++
 data-access/nexustiles/dao/S3Proxy.pyx          | 129 ++++++++++++
 data-access/nexustiles/dao/SolrProxy.pyx        |   4 +
 data-access/nexustiles/nexustiles.py            |  68 +++---
 .../developer-box/data/grace/dataseturl.txt     |   2 +-
 nexus-ingest/developer-box/nexus                |   1 +
 nexus-ingest/nexus-sink/build.gradle            |  19 +-
 .../gradle/wrapper/gradle-wrapper.properties    |   2 +-
 nexus-ingest/nexus-sink/nexus-sink.ipr          | 102 +++++++++
 nexus-ingest/nexus-sink/nexus-sink.iws          | 207 +++++++++++++++++++
 .../ingest/nexussink/CassandraStore.groovy      |  40 ++++
 .../nexus/ingest/nexussink/NexusService.groovy  | 152 +-------------
 .../jpl/nexus/ingest/nexussink/SolrStore.groovy | 143 +++++++++++++
 .../jpl/nexus/ingest/nexussink/DataStore.java   |  17 ++
 .../ingest/nexussink/DataStoreException.java    |  24 +++
 .../jpl/nexus/ingest/nexussink/DynamoStore.java |  57 +++++
 .../nexussink/InfrastructureConfiguration.java  | 133 +++++++++---
 .../nexussink/IntegrationConfiguration.java     |  34 +--
 .../nexus/ingest/nexussink/MetadataStore.java   |  17 ++
 .../nexussink/NexusSinkOptionsMetadata.java     | 110 +++++++++-
 .../jpl/nexus/ingest/nexussink/S3Store.java     |  68 ++++++
 .../nexussink/NexusSinkIntegrationTest.groovy   | 138 +++++++++++--
 .../ingest/nexussink/SolrStoreUnitTest.groovy   |  68 ++++++
 .../NexusSinkOptionsIntegrationTest.java        |   9 +-
 .../TestInfrastructureConfiguration.java        |  31 +++
 27 files changed, 1461 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/config/datastores.ini
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/config/datastores.ini b/data-access/nexustiles/config/datastores.ini
index 194760c..e578703 100644
--- a/data-access/nexustiles/config/datastores.ini
+++ b/data-access/nexustiles/config/datastores.ini
@@ -4,6 +4,17 @@ keyspace=nexustiles
 local_datacenter=datacenter1
 protocol_version=3
 
+[s3]
+bucket=nexus-jpl
+region=us-west-2
+
+[dynamo]
+table=nexus-jpl-table
+region=us-west-2
+
 [solr]
 host=localhost:8983
-core=nexustiles
\ No newline at end of file
+core=nexustiles
+
+[datastore]
+store=s3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/CassandraProxy.pyx
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/dao/CassandraProxy.pyx b/data-access/nexustiles/dao/CassandraProxy.pyx
index 8b005f5..c9008b2 100644
--- a/data-access/nexustiles/dao/CassandraProxy.pyx
+++ b/data-access/nexustiles/dao/CassandraProxy.pyx
@@ -1,3 +1,7 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
 import uuid
 from ConfigParser import NoOptionError
 from multiprocessing.synchronize import Lock

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/DynamoProxy.pyx
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/dao/DynamoProxy.pyx b/data-access/nexustiles/dao/DynamoProxy.pyx
new file mode 100644
index 0000000..ca8706f
--- /dev/null
+++ b/data-access/nexustiles/dao/DynamoProxy.pyx
@@ -0,0 +1,135 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import uuid
+import nexusproto.NexusContent_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+import numpy as np
+import boto3
+
+class NexusTileData(object):
+    __nexus_tile = None
+    __data = None
+    tile_id = None
+
+    def __init__(self, data, _tile_id):
+        if self.__data is None:
+            self.__data = data
+        if self.tile_id is None:
+            self.tile_id = _tile_id
+
+    def _get_nexus_tile(self):
+        if self.__nexus_tile is None:
+            self.__nexus_tile = nexusproto.TileData.FromString(self.__data)
+
+        return self.__nexus_tile
+
+    def get_raw_data_array(self):
+
+        nexus_tile = self._get_nexus_tile()
+        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
+
+        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
+
+        return from_shaped_array(the_tile_data.variable_data)
+
+    def get_lat_lon_time_data_meta(self):
+        if self._get_nexus_tile().HasField('grid_tile'):
+            grid_tile = self._get_nexus_tile().grid_tile
+
+            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
+            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
+            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
+
+            if len(grid_tile_data.shape) == 2:
+                grid_tile_data = grid_tile_data[np.newaxis, :]
+
+            # Extract the meta data
+            meta_data = {}
+            for meta_data_obj in grid_tile.meta_data:
+                name = meta_data_obj.name
+                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+                if len(meta_array.shape) == 2:
+                    meta_array = meta_array[np.newaxis, :]
+                meta_data[name] = meta_array
+
+            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
+        elif self._get_nexus_tile().HasField('swath_tile'):
+            swath_tile = self._get_nexus_tile().swath_tile
+
+            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
+            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
+            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
+
+            # Simplify the tile if the time dimension is the same value repeated
+            if np.all(time_data == np.min(time_data)):
+                time_data = np.array([np.min(time_data)])
+
+            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
+
+            tile_data = self._to_standard_index(swath_tile_data,
+                                                (len(time_data), len(latitude_data), len(longitude_data)))
+
+            # Extract the meta data
+            meta_data = {}
+            for meta_data_obj in swath_tile.meta_data:
+                name = meta_data_obj.name
+                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
+                meta_data[name] = reshaped_meta_array
+
+            return latitude_data, longitude_data, time_data, tile_data, meta_data
+        else:
+            raise NotImplementedError("Only supports grid_tile and swath_tile")
+
+    @staticmethod
+    def _to_standard_index(data_array, desired_shape):
+
+        if desired_shape[0] == 1:
+            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
+            row, col = np.indices(data_array.shape)
+
+            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
+                row.flat, col.flat]
+            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
+                row.flat, col.flat]
+            reshaped_array = reshaped_array[np.newaxis, :]
+        else:
+            reshaped_array = np.ma.masked_all(desired_shape)
+            row, col = np.indices(data_array.shape)
+
+            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
+                row.flat, col.flat]
+            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
+                row.flat, col.flat]
+
+        return reshaped_array
+
+
+class DynamoProxy(object):
+    def __init__(self, config):
+        self.config = config
+        self.__dynamo_tablename = config.get("dynamo", "table")
+        self.__dynamo_region = config.get("dynamo", "region")
+        self.__dynamo = boto3.resource('dynamodb', region_name=self.__dynamo_region)
+        self.__dynamo_table = self.__dynamo.Table(self.__dynamo_tablename)
+        self.__nexus_tile = None
+
+    def fetch_nexus_tiles(self, *tile_ids):
+
+        tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
+                    (isinstance(tile_id, str) or isinstance(tile_id, unicode))]
+        res = []
+        for tile_id in tile_ids:
+            response = self.__dynamo_table.get_item(
+                Key = {
+                    'tile_id': str(tile_id)
+                }
+            )
+            item = response['Item']
+            data = item['data'].__str__()
+            nexus_tile = NexusTileData(data, str(tile_id))
+            res.append(nexus_tile)
+
+        return res
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/S3Proxy.pyx
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/dao/S3Proxy.pyx b/data-access/nexustiles/dao/S3Proxy.pyx
new file mode 100644
index 0000000..d3b1a84
--- /dev/null
+++ b/data-access/nexustiles/dao/S3Proxy.pyx
@@ -0,0 +1,129 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import uuid
+import nexusproto.NexusContent_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+import numpy as np
+import boto3
+
+class NexusTileData(object):
+    __nexus_tile = None
+    __data = None
+    tile_id = None
+
+    def __init__(self, data, _tile_id):
+        if self.__data is None:
+            self.__data = data
+        if self.tile_id is None:
+            self.tile_id = _tile_id
+
+    def _get_nexus_tile(self):
+        if self.__nexus_tile is None:
+            self.__nexus_tile = nexusproto.TileData.FromString(self.__data)
+
+        return self.__nexus_tile
+
+    def get_raw_data_array(self):
+
+        nexus_tile = self._get_nexus_tile()
+        the_tile_type = nexus_tile.tile.WhichOneof("tile_type")
+
+        the_tile_data = getattr(nexus_tile.tile, the_tile_type)
+
+        return from_shaped_array(the_tile_data.variable_data)
+
+    def get_lat_lon_time_data_meta(self):
+        if self._get_nexus_tile().HasField('grid_tile'):
+            grid_tile = self._get_nexus_tile().grid_tile
+
+            grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data))
+            latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude))
+            longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude))
+
+            if len(grid_tile_data.shape) == 2:
+                grid_tile_data = grid_tile_data[np.newaxis, :]
+
+            # Extract the meta data
+            meta_data = {}
+            for meta_data_obj in grid_tile.meta_data:
+                name = meta_data_obj.name
+                meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+                if len(meta_array.shape) == 2:
+                    meta_array = meta_array[np.newaxis, :]
+                meta_data[name] = meta_array
+
+            return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data
+        elif self._get_nexus_tile().HasField('swath_tile'):
+            swath_tile = self._get_nexus_tile().swath_tile
+
+            latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1)
+            longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1)
+            time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1)
+
+            # Simplify the tile if the time dimension is the same value repeated
+            if np.all(time_data == np.min(time_data)):
+                time_data = np.array([np.min(time_data)])
+
+            swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data))
+
+            tile_data = self._to_standard_index(swath_tile_data,
+                                                (len(time_data), len(latitude_data), len(longitude_data)))
+
+            # Extract the meta data
+            meta_data = {}
+            for meta_data_obj in swath_tile.meta_data:
+                name = meta_data_obj.name
+                actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data))
+                reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape)
+                meta_data[name] = reshaped_meta_array
+
+            return latitude_data, longitude_data, time_data, tile_data, meta_data
+        else:
+            raise NotImplementedError("Only supports grid_tile and swath_tile")
+
+    @staticmethod
+    def _to_standard_index(data_array, desired_shape):
+
+        if desired_shape[0] == 1:
+            reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2]))
+            row, col = np.indices(data_array.shape)
+
+            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
+                row.flat, col.flat]
+            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
+                row.flat, col.flat]
+            reshaped_array = reshaped_array[np.newaxis, :]
+        else:
+            reshaped_array = np.ma.masked_all(desired_shape)
+            row, col = np.indices(data_array.shape)
+
+            reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[
+                row.flat, col.flat]
+            reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[
+                row.flat, col.flat]
+
+        return reshaped_array
+
+
+class S3Proxy(object):
+    def __init__(self, config):
+        self.config = config
+        self.__s3_bucketname = config.get("s3", "bucket")
+        self.__s3_region = config.get("s3", "region")
+        self.__s3 = boto3.resource('s3')
+        self.__nexus_tile = None
+
+    def fetch_nexus_tiles(self, *tile_ids):
+
+        tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if
+                    (isinstance(tile_id, str) or isinstance(tile_id, unicode))]
+        res = []
+        for tile_id in tile_ids:
+            obj = self.__s3.Object(self.__s3_bucketname, str(tile_id))
+            data = obj.get()['Body'].read()
+            nexus_tile = NexusTileData(data, str(tile_id))
+            res.append(nexus_tile)
+
+        return res
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/SolrProxy.pyx
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/dao/SolrProxy.pyx b/data-access/nexustiles/dao/SolrProxy.pyx
index 0d775e6..434de4b 100644
--- a/data-access/nexustiles/dao/SolrProxy.pyx
+++ b/data-access/nexustiles/dao/SolrProxy.pyx
@@ -1,3 +1,7 @@
+"""
+Copyright (c) 2017 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
 import json
 import logging
 import threading

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/nexustiles.py
----------------------------------------------------------------------
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index e97ecf6..af18f96 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -3,18 +3,20 @@ Copyright (c) 2016 Jet Propulsion Laboratory,
 California Institute of Technology.  All rights reserved
 """
 import ConfigParser
+import sys
 from datetime import datetime
 from functools import wraps
 
 import numpy as np
 import numpy.ma as ma
 import pkg_resources
-import sys
+from dao.CassandraProxy import CassandraProxy
+from dao.S3Proxy import S3Proxy
+from dao.DynamoProxy import DynamoProxy
+from dao.SolrProxy import SolrProxy
 from pytz import timezone
 from shapely.geometry import MultiPolygon, box
 
-from dao.CassandraProxy import CassandraProxy
-from dao.SolrProxy import SolrProxy
 from model.nexusmodel import Tile, BBox, TileStats
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -46,7 +48,10 @@ class NexusTileServiceException(Exception):
 
 
 class NexusTileService(object):
-    def __init__(self, skipCassandra=False, skipSolr=False, config=None):
+    def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None):
+        self._datastore = None
+        self._metadatastore = None
+
         if config is None:
             self._config = ConfigParser.RawConfigParser()
             self._config.readfp(pkg_resources.resource_stream(__name__, "config/datastores.ini"),
@@ -54,28 +59,36 @@ class NexusTileService(object):
         else:
             self._config = config
 
-        if not skipCassandra:
-            self._cass = CassandraProxy(self._config)
+        if not skipDatastore:
+            datastore = self._config.get("datastore", "store")
+            if datastore == "cassandra":
+                self._datastore = CassandraProxy(self._config)
+            elif datastore == "s3":
+                self._datastore = S3Proxy(self._config)
+            elif datastore == "dynamo":
+                self._datastore = DynamoProxy(self._config)
+            else:
+                raise ValueError("Error reading datastore from config file")
 
-        if not skipSolr:
-            self._solr = SolrProxy(self._config)
+        if not skipMetadatastore:
+            self._metadatastore = SolrProxy(self._config)
 
     def get_dataseries_list(self, simple=False):
         if simple:
-            return self._solr.get_data_series_list_simple()
+            return self._metadatastore.get_data_series_list_simple()
         else:
-            return self._solr.get_data_series_list()
+            return self._metadatastore.get_data_series_list()
 
     @tile_data()
     def find_tile_by_id(self, tile_id, **kwargs):
-        return self._solr.find_tile_by_id(tile_id)
+        return self._metadatastore.find_tile_by_id(tile_id)
 
     @tile_data()
     def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
-        return self._solr.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
+        return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs)
 
     def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, **kwargs):
-        return self._solr.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
+        return self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time,
                                                  **kwargs)
 
     @tile_data()
@@ -103,7 +116,7 @@ class NexusTileService(object):
         :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found
         """
         try:
-            tile = self._solr.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year)
+            tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year)
         except IndexError:
             raise NexusTileServiceException("No tile found."), None, sys.exc_info()[2]
 
@@ -111,27 +124,27 @@ class NexusTileService(object):
 
     @tile_data()
     def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._solr.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
+        return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
                                                         **kwargs)
 
     @tile_data()
     def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs):
-        return self._solr.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
+        return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000,
                                                             **kwargs)
 
     @tile_data()
     def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs):
         # Find tiles that fall in the given box in the Solr index
-        return self._solr.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
+        return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time,
                                                             end_time, **kwargs)
 
     @tile_data()
     def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs):
         # Find tiles that fall within the polygon in the Solr index
         if 'sort' in kwargs.keys():
-            tiles = self._solr.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
+            tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs)
         else:
-            tiles = self._solr.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time,
+            tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time,
                                                                      **kwargs)
         return tiles
 
@@ -149,13 +162,13 @@ class NexusTileService(object):
         :param kwargs: fetch_data: True/False = whether or not to retrieve tile data
         :return:
         """
-        tiles = self._solr.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time,
+        tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time,
                                                       end_time)
         return tiles
 
     @tile_data()
     def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        return self._solr.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
+        return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000,
                                                           **kwargs)
 
     def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1,
@@ -190,7 +203,7 @@ class NexusTileService(object):
         return tiles
 
     def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs):
-        tiles = self._solr.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
+        tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time,
                                                              **kwargs)
 
         return tiles
@@ -215,7 +228,7 @@ class NexusTileService(object):
         :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
         :return: long time in seconds since epoch
         """
-        min_time = self._solr.find_min_date_from_tiles(tile_ids, ds=ds)
+        min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds)
         return long((min_time - EPOCH).total_seconds())
 
     def get_max_time(self, tile_ids, ds=None):
@@ -225,7 +238,7 @@ class NexusTileService(object):
         :param ds: Filter by a specific dataset. Defaults to None (queries all datasets)
         :return: long time in seconds since epoch
         """
-        max_time = self._solr.find_max_date_from_tiles(tile_ids, ds=ds)
+        max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds)
         return long((max_time - EPOCH).total_seconds())
 
     def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time):
@@ -237,7 +250,7 @@ class NexusTileService(object):
         :param end_time: The end time to search for tiles
         :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon
         """
-        bounds = self._solr.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time)
+        bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time)
         return [box(*b) for b in bounds]
 
     def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles):
@@ -279,7 +292,8 @@ class NexusTileService(object):
     def fetch_data_for_tiles(self, *tiles):
 
         nexus_tile_ids = set([tile.tile_id for tile in tiles])
-        matched_tile_data = self._cass.fetch_nexus_tiles(*nexus_tile_ids)
+        matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids)
+
         tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data}
 
         missing_data = nexus_tile_ids.difference(tile_data_by_id.keys())
@@ -359,7 +373,7 @@ class NexusTileService(object):
         return tiles
 
     def pingSolr(self):
-        status = self._solr.ping()
+        status = self._metadatastore.ping()
         if status and status["status"] == "OK":
             return True
         else:

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/developer-box/data/grace/dataseturl.txt
----------------------------------------------------------------------
diff --git a/nexus-ingest/developer-box/data/grace/dataseturl.txt b/nexus-ingest/developer-box/data/grace/dataseturl.txt
index c4cdb3b..d28abeb 100644
--- a/nexus-ingest/developer-box/data/grace/dataseturl.txt
+++ b/nexus-ingest/developer-box/data/grace/dataseturl.txt
@@ -1 +1 @@
-https://podaac.jpl.nasa.gov/dataset/TELLUS_GRACE_MASCON_GRID_RL05_V2
+https://podaac.jpl.nasa.gov/dataset/TELLUS_GRACE_MASCON_GRID_RL05_V1

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/developer-box/nexus
----------------------------------------------------------------------
diff --git a/nexus-ingest/developer-box/nexus b/nexus-ingest/developer-box/nexus
new file mode 160000
index 0000000..3e1b45f
--- /dev/null
+++ b/nexus-ingest/developer-box/nexus
@@ -0,0 +1 @@
+Subproject commit 3e1b45fbb7b326911aace8f20f2ee077e1994591

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/build.gradle
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/build.gradle b/nexus-ingest/nexus-sink/build.gradle
index 7807398..5f31265 100644
--- a/nexus-ingest/nexus-sink/build.gradle
+++ b/nexus-ingest/nexus-sink/build.gradle
@@ -1,5 +1,6 @@
 buildscript {
     repositories {
+        mavenCentral()
         if( project.hasProperty('artifactory_contextUrl') ) {
             maven {
                 url "${artifactory_contextUrl}"
@@ -31,6 +32,7 @@ buildscript {
     dependencies {
         classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4+"
         classpath("org.springframework.xd:spring-xd-module-plugin:1.3.1.RELEASE")
+        classpath "io.spring.gradle:dependency-management-plugin:1.0.0.RC2"
     }
 }
 
@@ -107,13 +109,12 @@ ext {
     ]
 }
 
-
-
 apply plugin: 'java'
 apply plugin: 'groovy'
 apply plugin: 'idea'
 apply plugin: 'spring-xd-module'
 apply plugin: 'maven-publish'
+apply plugin: "io.spring.dependency-management"
 apply plugin: 'project-report'
 
 group = 'org.nasa.jpl.nexus.ingest'
@@ -145,6 +146,12 @@ sourceSets {
     }
 }
 
+dependencyManagement {
+    imports {
+        mavenBom 'com.amazonaws:aws-java-sdk-bom:1.10.77+'
+    }
+}
+
 //noinspection GroovyAssignabilityCheck
 dependencies {
     compile("org.springframework.boot:spring-boot-starter-integration")
@@ -157,8 +164,10 @@ dependencies {
     compile "org.apache.solr:solr-solrj:5.3.1"
     compile 'org.codehaus.groovy:groovy'
 
+    compile 'com.amazonaws:aws-java-sdk-s3'
+    compile 'com.amazonaws:aws-java-sdk-dynamodb'
+    testCompile 'io.findify:s3mock_2.12:0.2.3'
 
-    testCompile group: 'io.findify', name: 's3mock_2.12', version: '0.2.3'
     testCompile('org.springframework.boot:spring-boot-starter-test')
     testCompile("org.apache.cassandra:cassandra-all")
     testCompile("org.apache.solr:solr-core:${testversions.solrCoreVersion}"){
@@ -171,6 +180,10 @@ dependencies {
     testCompile group: 'junit', name: 'junit', version: '4.11'
 }
 
+task buildScriptDependencies(type: org.gradle.api.tasks.diagnostics.DependencyReportTask) {
+    configurations = project.buildscript.configurations
+}
+
 task wrapper(type: Wrapper) {
     gradleVersion = '2.12'
 }

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties b/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties
index 98d7456..e90496d 100644
--- a/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties
+++ b/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties
@@ -1,4 +1,4 @@
-#Thu Aug 11 13:56:54 PDT 2016
+#Wed Jun 28 16:08:35 PDT 2017
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/nexus-sink.ipr
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/nexus-sink.ipr b/nexus-ingest/nexus-sink/nexus-sink.ipr
new file mode 100644
index 0000000..57dbab0
--- /dev/null
+++ b/nexus-ingest/nexus-sink/nexus-sink.ipr
@@ -0,0 +1,102 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="CompilerConfiguration">
+    <option name="DEFAULT_COMPILER" value="Javac"/>
+    <resourceExtensions>
+      <entry name=".+\.(properties|xml|html|dtd|tld)"/>
+      <entry name=".+\.(gif|png|jpeg|jpg)"/>
+    </resourceExtensions>
+    <wildcardResourcePatterns>
+      <entry name="!?*.java"/>
+      <entry name="!?*.groovy"/>
+    </wildcardResourcePatterns>
+    <annotationProcessing enabled="false" useClasspath="true"/>
+    <bytecodeTargetLevel target="1.8"/>
+  </component>
+  <component name="CopyrightManager" default="">
+    <module2copyright/>
+  </component>
+  <component name="DependencyValidationManager">
+    <option name="SKIP_IMPORT_STATEMENTS" value="false"/>
+  </component>
+  <component name="Encoding" useUTFGuessing="true" native2AsciiForPropertiesFiles="false"/>
+  <component name="GradleUISettings">
+    <setting name="root"/>
+  </component>
+  <component name="GradleUISettings2">
+    <setting name="root"/>
+  </component>
+  <component name="IdProvider" IDEtalkID="11DA1DB66DD62DDA1ED602B7079FE97C"/>
+  <component name="JavadocGenerationManager">
+    <option name="OUTPUT_DIRECTORY"/>
+    <option name="OPTION_SCOPE" value="protected"/>
+    <option name="OPTION_HIERARCHY" value="true"/>
+    <option name="OPTION_NAVIGATOR" value="true"/>
+    <option name="OPTION_INDEX" value="true"/>
+    <option name="OPTION_SEPARATE_INDEX" value="true"/>
+    <option name="OPTION_DOCUMENT_TAG_USE" value="false"/>
+    <option name="OPTION_DOCUMENT_TAG_AUTHOR" value="false"/>
+    <option name="OPTION_DOCUMENT_TAG_VERSION" value="false"/>
+    <option name="OPTION_DOCUMENT_TAG_DEPRECATED" value="true"/>
+    <option name="OPTION_DEPRECATED_LIST" value="true"/>
+    <option name="OTHER_OPTIONS" value=""/>
+    <option name="HEAP_SIZE"/>
+    <option name="LOCALE"/>
+    <option name="OPEN_IN_BROWSER" value="true"/>
+  </component>
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/nexus-sink.iml" filepath="$PROJECT_DIR$/nexus-sink.iml"/>
+    </modules>
+  </component>
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" assert-keyword="true" jdk-15="true" project-jdk-type="JavaSDK" assert-jdk-15="true" project-jdk-name="1.8">
+    <output url="file://$PROJECT_DIR$/out"/>
+  </component>
+  <component name="SvnBranchConfigurationManager">
+    <option name="mySupportsUserInfoFilter" value="true"/>
+  </component>
+  <component name="VcsDirectoryMappings">
+    <mapping directory="" vcs=""/>
+  </component>
+  <component name="masterDetails">
+    <states>
+      <state key="ArtifactsStructureConfigurable.UI">
+        <UIState>
+          <splitter-proportions>
+            <SplitterProportionsDataImpl/>
+          </splitter-proportions>
+          <settings/>
+        </UIState>
+      </state>
+      <state key="Copyright.UI">
+        <UIState>
+          <splitter-proportions>
+            <SplitterProportionsDataImpl/>
+          </splitter-proportions>
+        </UIState>
+      </state>
+      <state key="ProjectJDKs.UI">
+        <UIState>
+          <splitter-proportions>
+            <SplitterProportionsDataImpl>
+              <option name="proportions">
+                <list>
+                  <option value="0.2"/>
+                </list>
+              </option>
+            </SplitterProportionsDataImpl>
+          </splitter-proportions>
+          <last-edited>1.6</last-edited>
+        </UIState>
+      </state>
+      <state key="ScopeChooserConfigurable.UI">
+        <UIState>
+          <splitter-proportions>
+            <SplitterProportionsDataImpl/>
+          </splitter-proportions>
+          <settings/>
+        </UIState>
+      </state>
+    </states>
+  </component>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/nexus-sink.iws
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/nexus-sink.iws b/nexus-ingest/nexus-sink/nexus-sink.iws
new file mode 100644
index 0000000..d5bc759
--- /dev/null
+++ b/nexus-ingest/nexus-sink/nexus-sink.iws
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ChangeListManager">
+    <option name="TRACKING_ENABLED" value="true"/>
+    <option name="SHOW_DIALOG" value="false"/>
+    <option name="HIGHLIGHT_CONFLICTS" value="true"/>
+    <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false"/>
+    <option name="LAST_RESOLUTION" value="IGNORE"/>
+  </component>
+  <component name="ChangesViewManager" flattened_view="true" show_ignored="false"/>
+  <component name="CreatePatchCommitExecutor">
+    <option name="PATCH_PATH" value=""/>
+    <option name="REVERSE_PATCH" value="false"/>
+  </component>
+  <component name="DaemonCodeAnalyzer">
+    <disable_hints/>
+  </component>
+  <component name="DebuggerManager">
+    <breakpoint_any>
+      <breakpoint>
+        <option name="NOTIFY_CAUGHT" value="true"/>
+        <option name="NOTIFY_UNCAUGHT" value="true"/>
+        <option name="ENABLED" value="false"/>
+        <option name="LOG_ENABLED" value="false"/>
+        <option name="LOG_EXPRESSION_ENABLED" value="false"/>
+        <option name="SUSPEND_POLICY" value="SuspendAll"/>
+        <option name="COUNT_FILTER_ENABLED" value="false"/>
+        <option name="COUNT_FILTER" value="0"/>
+        <option name="CONDITION_ENABLED" value="false"/>
+        <option name="CLASS_FILTERS_ENABLED" value="false"/>
+        <option name="INSTANCE_FILTERS_ENABLED" value="false"/>
+        <option name="CONDITION" value=""/>
+        <option name="LOG_MESSAGE" value=""/>
+      </breakpoint>
+      <breakpoint>
+        <option name="NOTIFY_CAUGHT" value="true"/>
+        <option name="NOTIFY_UNCAUGHT" value="true"/>
+        <option name="ENABLED" value="false"/>
+        <option name="LOG_ENABLED" value="false"/>
+        <option name="LOG_EXPRESSION_ENABLED" value="false"/>
+        <option name="SUSPEND_POLICY" value="SuspendAll"/>
+        <option name="COUNT_FILTER_ENABLED" value="false"/>
+        <option name="COUNT_FILTER" value="0"/>
+        <option name="CONDITION_ENABLED" value="false"/>
+        <option name="CLASS_FILTERS_ENABLED" value="false"/>
+        <option name="INSTANCE_FILTERS_ENABLED" value="false"/>
+        <option name="CONDITION" value=""/>
+        <option name="LOG_MESSAGE" value=""/>
+      </breakpoint>
+    </breakpoint_any>
+    <breakpoint_rules/>
+    <ui_properties/>
+  </component>
+  <component name="ModuleEditorState">
+    <option name="LAST_EDITED_MODULE_NAME"/>
+    <option name="LAST_EDITED_TAB_NAME"/>
+  </component>
+  <component name="ProjectInspectionProfilesVisibleTreeState">
+    <entry key="Project Default">
+      <profile-state/>
+    </entry>
+  </component>
+  <component name="ProjectLevelVcsManager">
+    <OptionsSetting value="true" id="Add"/>
+    <OptionsSetting value="true" id="Remove"/>
+    <OptionsSetting value="true" id="Checkout"/>
+    <OptionsSetting value="true" id="Update"/>
+    <OptionsSetting value="true" id="Status"/>
+    <OptionsSetting value="true" id="Edit"/>
+    <ConfirmationsSetting value="0" id="Add"/>
+    <ConfirmationsSetting value="0" id="Remove"/>
+  </component>
+  <component name="ProjectReloadState">
+    <option name="STATE" value="0"/>
+  </component>
+  <component name="PropertiesComponent">
+    <property name="GoToFile.includeJavaFiles" value="false"/>
+    <property name="GoToClass.toSaveIncludeLibraries" value="false"/>
+    <property name="MemberChooser.sorted" value="false"/>
+    <property name="MemberChooser.showClasses" value="true"/>
+    <property name="GoToClass.includeLibraries" value="false"/>
+    <property name="MemberChooser.copyJavadoc" value="false"/>
+  </component>
+  <component name="RunManager">
+    <configuration default="true" type="Remote" factoryName="Remote">
+      <option name="USE_SOCKET_TRANSPORT" value="true"/>
+      <option name="SERVER_MODE" value="false"/>
+      <option name="SHMEM_ADDRESS" value="javadebug"/>
+      <option name="HOST" value="localhost"/>
+      <option name="PORT" value="5005"/>
+      <method>
+        <option name="BuildArtifacts" enabled="false"/>
+      </method>
+    </configuration>
+    <configuration default="true" type="Applet" factoryName="Applet">
+      <module name=""/>
+      <option name="MAIN_CLASS_NAME"/>
+      <option name="HTML_FILE_NAME"/>
+      <option name="HTML_USED" value="false"/>
+      <option name="WIDTH" value="400"/>
+      <option name="HEIGHT" value="300"/>
+      <option name="POLICY_FILE" value="$APPLICATION_HOME_DIR$/bin/appletviewer.policy"/>
+      <option name="VM_PARAMETERS"/>
+      <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/>
+      <option name="ALTERNATIVE_JRE_PATH"/>
+      <method>
+        <option name="BuildArtifacts" enabled="false"/>
+        <option name="Make" enabled="true"/>
+      </method>
+    </configuration>
+    <configuration default="true" type="Application" factoryName="Application">
+      <extension name="coverage" enabled="false" merge="false"/>
+      <option name="MAIN_CLASS_NAME"/>
+      <option name="VM_PARAMETERS"/>
+      <option name="PROGRAM_PARAMETERS"/>
+      <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$"/>
+      <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/>
+      <option name="ALTERNATIVE_JRE_PATH"/>
+      <option name="ENABLE_SWING_INSPECTOR" value="false"/>
+      <option name="ENV_VARIABLES"/>
+      <option name="PASS_PARENT_ENVS" value="true"/>
+      <module name=""/>
+      <envs/>
+      <method>
+        <option name="BuildArtifacts" enabled="false"/>
+        <option name="Make" enabled="true"/>
+      </method>
+    </configuration>
+    <configuration default="true" type="JUnit" factoryName="JUnit">
+      <extension name="coverage" enabled="false" merge="false"/>
+      <module name=""/>
+      <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/>
+      <option name="ALTERNATIVE_JRE_PATH"/>
+      <option name="PACKAGE_NAME"/>
+      <option name="MAIN_CLASS_NAME"/>
+      <option name="METHOD_NAME"/>
+      <option name="TEST_OBJECT" value="class"/>
+      <option name="VM_PARAMETERS"/>
+      <option name="PARAMETERS"/>
+      <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$"/>
+      <option name="ENV_VARIABLES"/>
+      <option name="PASS_PARENT_ENVS" value="true"/>
+      <option name="TEST_SEARCH_SCOPE">
+        <value defaultName="moduleWithDependencies"/>
+      </option>
+      <envs/>
+      <method>
+        <option name="BuildArtifacts" enabled="false"/>
+        <option name="Make" enabled="true"/>
+      </method>
+    </configuration>
+    <list size="0"/>
+    <configuration name="&lt;template&gt;" type="WebApp" default="true" selected="false">
+      <Host>localhost</Host>
+      <Port>5050</Port>
+    </configuration>
+  </component>
+  <component name="ShelveChangesManager" show_recycled="false"/>
+  <component name="SvnConfiguration" maxAnnotateRevisions="500">
+    <option name="USER" value=""/>
+    <option name="PASSWORD" value=""/>
+    <option name="LAST_MERGED_REVISION"/>
+    <option name="UPDATE_RUN_STATUS" value="false"/>
+    <option name="MERGE_DRY_RUN" value="false"/>
+    <option name="MERGE_DIFF_USE_ANCESTRY" value="true"/>
+    <option name="UPDATE_LOCK_ON_DEMAND" value="false"/>
+    <option name="IGNORE_SPACES_IN_MERGE" value="false"/>
+    <option name="DETECT_NESTED_COPIES" value="true"/>
+    <option name="IGNORE_SPACES_IN_ANNOTATE" value="true"/>
+    <option name="SHOW_MERGE_SOURCES_IN_ANNOTATE" value="true"/>
+    <myIsUseDefaultProxy>false</myIsUseDefaultProxy>
+  </component>
+  <component name="TaskManager">
+    <task active="true" id="Default" summary="Default task"/>
+    <servers/>
+  </component>
+  <component name="VcsManagerConfiguration">
+    <option name="OFFER_MOVE_TO_ANOTHER_CHANGELIST_ON_PARTIAL_COMMIT" value="true"/>
+    <option name="CHECK_CODE_SMELLS_BEFORE_PROJECT_COMMIT" value="true"/>
+    <option name="PERFORM_UPDATE_IN_BACKGROUND" value="true"/>
+    <option name="PERFORM_COMMIT_IN_BACKGROUND" value="true"/>
+    <option name="PERFORM_EDIT_IN_BACKGROUND" value="true"/>
+    <option name="PERFORM_CHECKOUT_IN_BACKGROUND" value="true"/>
+    <option name="PERFORM_ADD_REMOVE_IN_BACKGROUND" value="true"/>
+    <option name="PERFORM_ROLLBACK_IN_BACKGROUND" value="false"/>
+    <option name="CHECK_LOCALLY_CHANGED_CONFLICTS_IN_BACKGROUND" value="false"/>
+    <option name="ENABLE_BACKGROUND_PROCESSES" value="false"/>
+    <option name="CHANGED_ON_SERVER_INTERVAL" value="60"/>
+    <option name="FORCE_NON_EMPTY_COMMENT" value="false"/>
+    <option name="LAST_COMMIT_MESSAGE"/>
+    <option name="MAKE_NEW_CHANGELIST_ACTIVE" value="true"/>
+    <option name="OPTIMIZE_IMPORTS_BEFORE_PROJECT_COMMIT" value="false"/>
+    <option name="CHECK_FILES_UP_TO_DATE_BEFORE_COMMIT" value="false"/>
+    <option name="REFORMAT_BEFORE_PROJECT_COMMIT" value="false"/>
+    <option name="REFORMAT_BEFORE_FILE_COMMIT" value="false"/>
+    <option name="FILE_HISTORY_DIALOG_COMMENTS_SPLITTER_PROPORTION" value="0.8"/>
+    <option name="FILE_HISTORY_DIALOG_SPLITTER_PROPORTION" value="0.5"/>
+    <option name="ACTIVE_VCS_NAME"/>
+    <option name="UPDATE_GROUP_BY_PACKAGES" value="false"/>
+    <option name="UPDATE_GROUP_BY_CHANGELIST" value="false"/>
+    <option name="SHOW_FILE_HISTORY_AS_TREE" value="false"/>
+    <option name="FILE_HISTORY_SPLITTER_PROPORTION" value="0.6"/>
+  </component>
+  <component name="XDebuggerManager">
+    <breakpoint-manager/>
+  </component>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy
new file mode 100644
index 0000000..3db1995
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy
@@ -0,0 +1,40 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile
+import org.springframework.data.cassandra.core.CassandraOperations
+import java.nio.ByteBuffer
+
+/**
+ * Created by djsilvan on 6/27/17.
+ */
+class CassandraStore implements DataStore {
+
+    private CassandraOperations cassandraTemplate
+
+    //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group.
+    private String tableName = "sea_surface_temp"
+
+    public CassandraStore(CassandraOperations cassandraTemplate) {
+        this.cassandraTemplate = cassandraTemplate
+    }
+
+    @Override
+    void saveData(Collection<NexusTile> nexusTiles) {
+
+        def query = "insert into ${tableName} (tile_id, tile_blob) VALUES (?, ?)"
+        cassandraTemplate.ingest(query, nexusTiles.collect { nexusTile -> getCassandraRowFromTileData(nexusTile.tile) })
+    }
+
+    def getCassandraRowFromTileData(NexusContent.TileData tile) {
+
+        def tileId = UUID.fromString(tile.tileId)
+        def row = [tileId, ByteBuffer.wrap(tile.toByteArray())]
+        return row
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy
index 17f3190..8eae9d2 100644
--- a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy
+++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy
@@ -4,160 +4,24 @@
  *****************************************************************************/
 package org.nasa.jpl.nexus.ingest.nexussink
 
-import org.apache.solr.common.SolrInputDocument
-import org.apache.solr.common.SolrInputField
 import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent
-import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile
-import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.TileSummary
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-import org.springframework.core.env.Environment
-import org.springframework.data.cassandra.core.CassandraOperations
-import org.springframework.data.solr.core.SolrOperations
-
-import javax.annotation.Resource
-import java.nio.ByteBuffer
-import java.text.SimpleDateFormat
 
 /**
  * Created by greguska on 4/4/16.
  */
 class NexusService {
 
-    private Environment environment;
-
-    private Logger log = LoggerFactory.getLogger(NexusService.class)
-
-    private static final def iso = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
-    static {
-        iso.setTimeZone(TimeZone.getTimeZone("UTC"))
-    }
-
-    private SolrOperations solr
-    private CassandraOperations cassandraTemplate
-
-    //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group.
-    private String tableName = "sea_surface_temp"
-
-    public NexusService(SolrOperations solr, CassandraOperations cassandraTemplate) {
-        this.solr = solr
-        this.cassandraTemplate = cassandraTemplate
-    }
-
-    @Resource
-    void setEnvironment(Environment environment) {
-        this.environment = environment
-    }
-
-    def saveToNexus(Collection<NexusTile> nexusTiles) {
-
-        def solrdocs = nexusTiles.collect { nexusTile -> getSolrDocFromTileSummary(nexusTile.summary) }
-        solr.saveDocuments(solrdocs, environment.getProperty("solrCommitWithin", Integer.class, 1000))
-
-        def query = "insert into ${tableName} (tile_id, tile_blob) VALUES (?, ?)"
-        cassandraTemplate.ingest(query, nexusTiles.collect { nexusTile -> getCassandraRowFromTileData(nexusTile.tile) })
-
-    }
-
-    def getSolrDocFromTileSummary(TileSummary summary) {
-
-        def bbox = summary.getBbox()
-        def stats = summary.getStats()
-
-        def startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
-        startCal.setTime(new Date(stats.minTime * 1000))
-        def endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
-        endCal.setTime(new Date(stats.maxTime * 1000))
-
-        def minTime = iso.format(startCal.getTime())
-        def maxTime = iso.format(endCal.getTime())
+    private MetadataStore metadataStore
+    private DataStore dataStore
 
-        def geo = determineGeo(summary)
-
-        def doc = [
-                "table_s"         : tableName,
-                "geo"             : geo,
-                "id"              : "$summary.tileId".toString(),
-                "solr_id_s"       : "${summary.datasetName}!${summary.tileId}".toString(),
-                "dataset_id_s"    : "$summary.datasetUuid".toString(),
-                "sectionSpec_s"   : "$summary.sectionSpec".toString(),
-                "dataset_s"       : "$summary.datasetName".toString(),
-                "granule_s"       : "$summary.granule".toString(),
-                "tile_var_name_s" : "$summary.dataVarName".toString(),
-                "tile_min_lon"    : bbox.lonMin,
-                "tile_max_lon"    : bbox.lonMax,
-                "tile_min_lat"    : bbox.latMin,
-                "tile_max_lat"    : bbox.latMax,
-                "tile_min_time_dt": minTime,
-                "tile_max_time_dt": maxTime,
-                "tile_min_val_d"  : stats.min,
-                "tile_max_val_d"  : stats.max,
-                "tile_avg_val_d"  : stats.mean,
-                "tile_count_i"    : stats.count
-        ]
-
-        summary.globalAttributesList.forEach { attribute ->
-            doc["${attribute.name}"] = attribute.valuesCount == 1 ? attribute.getValues(0) : attribute.getValuesList().toList()
-        }
-
-        def solrdoc = toSolrInputDocument(doc)
-        return solrdoc
+    public NexusService(MetadataStore metadataStore, DataStore dataStore) {
+        this.metadataStore = metadataStore
+        this.dataStore = dataStore
     }
 
-    private determineGeo(def summary) {
-        //Solr cannot index a POLYGON where all corners are the same point or when there are only 2 distinct points (line).
-        //Solr is configured for a specific precision so we need to round to that precision before checking equality.
-        def geoPrecision = environment.getProperty("solrGeoPrecision", Integer.class, 3)
-        def latMin = summary.bbox.latMin.round(geoPrecision)
-        def latMax = summary.bbox.latMax.round(geoPrecision)
-        def lonMin = summary.bbox.lonMin.round(geoPrecision)
-        def lonMax = summary.bbox.lonMax.round(geoPrecision)
-        def geo
-        //If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON
-        if (latMin == latMax && lonMin == lonMax) {
-            geo = "POINT(${lonMin} ${latMin})"
-            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
-        }
-        //If lat min = lat max but lon min != lon max, then we essentially have a line.
-        else if (latMin == latMax) {
-            geo = "LINESTRING (${lonMin} ${latMin}, ${lonMax} ${latMin})"
-            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
-        }
-        //Same if lon min = lon max but lat min != lat max
-        else if (lonMin == lonMax) {
-            geo = "LINESTRING (${lonMin} ${latMin}, ${lonMin} ${latMax})"
-            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
-        }
-        //All other cases should use POLYGON
-        else {
-            geo = "POLYGON((" +
-                    "${lonMin} ${latMin}, " +
-                    "${lonMax} ${latMin}, " +
-                    "${lonMax} ${latMax}, " +
-                    "${lonMin} ${latMax}, " +
-                    "${lonMin} ${latMin}))"
-        }
-
-        return geo
-    }
-
-    def toSolrInputDocument(Map<String, Object> doc) {
-        def solrDoc = new SolrInputDocument()
-        solrDoc.putAll(doc.collectEntries { String key, Object value ->
-            def field = new SolrInputField(key)
-            field.setValue(value, 1.0f)
-            [(key): field]
-        })
-        return solrDoc
-    }
-
-    def getCassandraRowFromTileData(NexusContent.TileData tile) {
-
-        def tileId = UUID.fromString(tile.tileId)
-
-        def row = [tileId, ByteBuffer.wrap(tile.toByteArray())]
-
-        return row
+    def saveToNexus(Collection<NexusContent.NexusTile> nexusTiles) {
+        metadataStore.saveMetadata(nexusTiles)
+        dataStore.saveData(nexusTiles)
     }
 
     public static void main(String... args) {

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy
new file mode 100644
index 0000000..8f91746
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy
@@ -0,0 +1,143 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink
+
+import org.apache.solr.common.SolrInputDocument
+import org.apache.solr.common.SolrInputField
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.TileSummary
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.springframework.core.env.Environment
+import org.springframework.data.solr.core.SolrOperations
+
+import javax.annotation.Resource
+import java.text.SimpleDateFormat
+
+/**
+ * Created by djsilvan on 6/27/17.
+ */
+class SolrStore implements MetadataStore {
+
+    private Environment environment
+    private SolrOperations solr
+
+    private Logger log = LoggerFactory.getLogger(SolrStore.class)
+
+    //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group.
+    private String tableName = "sea_surface_temp"
+
+    private static final def iso = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
+    static {
+        iso.setTimeZone(TimeZone.getTimeZone("UTC"))
+    }
+
+    public SolrStore(SolrOperations solr) {
+        this.solr = solr
+    }
+
+    @Resource
+    void setEnvironment(Environment environment) {
+        this.environment = environment
+    }
+
+    void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles) {
+
+        def solrdocs = nexusTiles.collect { nexusTile -> getSolrDocFromTileSummary(nexusTile.summary) }
+        solr.saveDocuments(solrdocs, environment.getProperty("solrCommitWithin", Integer.class, 1000))
+    }
+
+    def getSolrDocFromTileSummary(TileSummary summary) {
+
+        def bbox = summary.getBbox()
+        def stats = summary.getStats()
+
+        def startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+        startCal.setTime(new Date(stats.minTime * 1000))
+        def endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
+        endCal.setTime(new Date(stats.maxTime * 1000))
+
+        def minTime = iso.format(startCal.getTime())
+        def maxTime = iso.format(endCal.getTime())
+
+        def geo = determineGeo(summary)
+
+        def doc = [
+                "table_s"         : tableName,
+                "geo"             : geo,
+                "id"              : "$summary.tileId".toString(),
+                "solr_id_s"       : "${summary.datasetName}!${summary.tileId}".toString(),
+                "dataset_id_s"    : "$summary.datasetUuid".toString(),
+                "sectionSpec_s"   : "$summary.sectionSpec".toString(),
+                "dataset_s"       : "$summary.datasetName".toString(),
+                "granule_s"       : "$summary.granule".toString(),
+                "tile_var_name_s" : "$summary.dataVarName".toString(),
+                "tile_min_lon"    : bbox.lonMin,
+                "tile_max_lon"    : bbox.lonMax,
+                "tile_min_lat"    : bbox.latMin,
+                "tile_max_lat"    : bbox.latMax,
+                "tile_min_time_dt": minTime,
+                "tile_max_time_dt": maxTime,
+                "tile_min_val_d"  : stats.min,
+                "tile_max_val_d"  : stats.max,
+                "tile_avg_val_d"  : stats.mean,
+                "tile_count_i"    : stats.count
+        ]
+
+        summary.globalAttributesList.forEach { attribute ->
+            doc["${attribute.name}"] = attribute.valuesCount == 1 ? attribute.getValues(0) : attribute.getValuesList().toList()
+        }
+
+        def solrdoc = toSolrInputDocument(doc)
+        return solrdoc
+    }
+
+    private determineGeo(def summary) {
+        //Solr cannot index a POLYGON where all corners are the same point or when there are only 2 distinct points (line).
+        //Solr is configured for a specific precision so we need to round to that precision before checking equality.
+        def geoPrecision = environment.getProperty("solrGeoPrecision", Integer.class, 3)
+        def latMin = summary.bbox.latMin.round(geoPrecision)
+        def latMax = summary.bbox.latMax.round(geoPrecision)
+        def lonMin = summary.bbox.lonMin.round(geoPrecision)
+        def lonMax = summary.bbox.lonMax.round(geoPrecision)
+        def geo
+        //If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON
+        if (latMin == latMax && lonMin == lonMax) {
+            geo = "POINT(${lonMin} ${latMin})"
+            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
+        }
+        //If lat min = lat max but lon min != lon max, then we essentially have a line.
+        else if (latMin == latMax) {
+            geo = "LINESTRING (${lonMin} ${latMin}, ${lonMax} ${latMin})"
+            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
+        }
+        //Same if lon min = lon max but lat min != lat max
+        else if (lonMin == lonMax) {
+            geo = "LINESTRING (${lonMin} ${latMin}, ${lonMin} ${latMax})"
+            log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo")
+        }
+        //All other cases should use POLYGON
+        else {
+            geo = "POLYGON((" +
+                    "${lonMin} ${latMin}, " +
+                    "${lonMax} ${latMin}, " +
+                    "${lonMax} ${latMax}, " +
+                    "${lonMin} ${latMax}, " +
+                    "${lonMin} ${latMin}))"
+        }
+
+        return geo
+    }
+
+    def toSolrInputDocument(Map<String, Object> doc) {
+        def solrDoc = new SolrInputDocument()
+        solrDoc.putAll(doc.collectEntries { String key, Object value ->
+            def field = new SolrInputField(key)
+            field.setValue(value, 1.0f)
+            [(key): field]
+        })
+        return solrDoc
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java
new file mode 100644
index 0000000..19837ab
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java
@@ -0,0 +1,17 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+
+import java.util.Collection;
+
+/**
+ * Created by djsilvan on 6/26/17.
+ */
+public interface DataStore {
+
+    public void saveData(Collection<NexusContent.NexusTile> nexusTiles);
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java
new file mode 100644
index 0000000..86aa120
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java
@@ -0,0 +1,24 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by djsilvan on 8/11/17.
+ */
+public class DataStoreException extends RuntimeException {
+
+    private Logger log = LoggerFactory.getLogger(NexusService.class);
+
+    public DataStoreException() {
+        log.error("Error: DataStore Exception");
+    }
+
+    public DataStoreException(Exception e) {
+        log.error("Error: " + e.getMessage());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java
new file mode 100644
index 0000000..ad8cfbc
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java
@@ -0,0 +1,57 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Created by djsilvan on 6/26/17.
+ */
+public class DynamoStore implements DataStore {
+
+    private DynamoDB dynamoDB;
+    private String tableName;
+    private String primaryKey = "tile_id";
+    private Logger log = LoggerFactory.getLogger(NexusService.class);
+
+    public DynamoStore(AmazonDynamoDB dynamoClient, String tableName) {
+        dynamoDB = new DynamoDB(dynamoClient);
+        this.tableName = tableName;
+    }
+
+    public void saveData(Collection<NexusContent.NexusTile> nexusTiles) {
+
+        Table table = dynamoDB.getTable(tableName);
+
+        for (NexusContent.NexusTile tile : nexusTiles) {
+            String tileId = getTileId(tile);
+            byte[] tileData = getTileData(tile);
+
+            try {
+                table.putItem(new Item().withPrimaryKey(primaryKey, tileId).withBinary("data", tileData));
+            }
+            catch (Exception e) {
+                log.error("Unable to add item: " + tileId);
+                throw new DataStoreException(e);
+            }
+        }
+    }
+
+    private String getTileId(NexusContent.NexusTile tile) {
+        return tile.getTile().getTileId();
+    }
+
+    private byte[] getTileData(NexusContent.NexusTile tile) {
+        return tile.getTile().toByteArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java
index fc89aaf..2f4bab3 100644
--- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java
@@ -4,6 +4,11 @@
 *****************************************************************************/
 package org.nasa.jpl.nexus.ingest.nexussink;
 
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.s3.AmazonS3Client;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
@@ -34,44 +39,54 @@ import static org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata.*;
 @Configuration
 public class InfrastructureConfiguration {
 
-    @Resource
-    private Environment environment;
+    @Configuration
+    @Profile("cassandra")
+    static class CassandraConfiguration {
+        @Resource
+        private Environment environment;
 
-    @Bean
-    public CassandraClusterFactoryBean cluster() {
+        @Bean
+        public CassandraClusterFactoryBean cluster() {
 
-        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
-        cluster.setContactPoints(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_CONTACT_POINTS));
-        cluster.setPort(Integer.parseInt(environment.getProperty(PROPERTY_NAME_CASSANDRA_PORT)));
+            CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
+            cluster.setContactPoints(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_CONTACT_POINTS));
+            cluster.setPort(Integer.parseInt(environment.getProperty(PROPERTY_NAME_CASSANDRA_PORT)));
 
-        return cluster;
-    }
+            return cluster;
+        }
 
-    @Bean
-    public CassandraMappingContext mappingContext() {
-        return new BasicCassandraMappingContext();
-    }
+        @Bean
+        public CassandraMappingContext mappingContext() {
+            return new BasicCassandraMappingContext();
+        }
 
-    @Bean
-    public CassandraConverter converter() {
-        return new MappingCassandraConverter(mappingContext());
-    }
+        @Bean
+        public CassandraConverter converter() {
+            return new MappingCassandraConverter(mappingContext());
+        }
 
-    @Bean
-    public CassandraSessionFactoryBean session() throws Exception {
+        @Bean
+        public CassandraSessionFactoryBean session() throws Exception {
 
-        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
-        session.setCluster(cluster().getObject());
-        session.setKeyspaceName(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_KEYSPACE));
-        session.setConverter(converter());
-        session.setSchemaAction(SchemaAction.NONE);
+            CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
+            session.setCluster(cluster().getObject());
+            session.setKeyspaceName(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_KEYSPACE));
+            session.setConverter(converter());
+            session.setSchemaAction(SchemaAction.NONE);
 
-        return session;
-    }
+            return session;
+        }
+
+        @Bean
+        public CassandraOperations cassandraTemplate() throws Exception {
+            return new CassandraTemplate(session().getObject());
+        }
 
-    @Bean
-    public CassandraOperations cassandraTemplate() throws Exception {
-        return new CassandraTemplate(session().getObject());
+        @Bean
+        public DataStore dataStore(CassandraOperations cassandraTemplate) {
+            DataStore dataStore = new CassandraStore(cassandraTemplate);
+            return dataStore;
+        }
     }
 
     @Configuration
@@ -93,12 +108,18 @@ public class InfrastructureConfiguration {
         public SolrOperations solrTemplate(SolrClient solrClient) {
             return new SolrTemplate(solrClient);
         }
+
+        @Bean
+        public MetadataStore metadataStore(SolrOperations solrTemplate) {
+            MetadataStore metadataStore = new SolrStore(solrTemplate);
+            return metadataStore;
+        }
     }
 
+
     @Configuration
     @Profile("solr-cloud")
     static class SolrCloudConfiguration {
-
         @Resource
         private Environment environment;
 
@@ -111,7 +132,7 @@ public class InfrastructureConfiguration {
         @Bean
         public SolrClient solrClient(){
             CloudSolrClient client = new CloudSolrClient(solrCloudZkHost);
-//            client.setIdField("dataset_s");
+            //client.setIdField("dataset_s");
             client.setDefaultCollection(solrCollection);
 
             return client;
@@ -121,7 +142,57 @@ public class InfrastructureConfiguration {
         public SolrOperations solrTemplate(SolrClient solrClient) {
             return new SolrTemplate(solrClient);
         }
+
+        @Bean
+        public MetadataStore metadataStore(SolrOperations solrTemplate) {
+            MetadataStore metadataStore = new SolrStore(solrTemplate);
+            return metadataStore;
+        }
     }
 
+    @Configuration
+    @Profile("s3")
+    static class S3Configuration {
+        @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_S3_BUCKET]}")
+        private String s3BucketName;
+
+        @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_AWS_REGION]}")
+        private String s3Region;
+
+        @Bean
+        public AmazonS3Client s3client() {
+            AmazonS3Client s3Client = new AmazonS3Client();
+            s3Client.setRegion(Region.getRegion(Regions.fromName(s3Region)));
+            return s3Client;
+        }
 
+        @Bean
+        public DataStore dataStore(AmazonS3Client s3Client) {
+            S3Store s3Store = new S3Store(s3Client, s3BucketName);
+            return s3Store;
+        }
+    }
+
+    @Configuration
+    @Profile("dynamo")
+    static class DynamoConfiguration {
+        @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_DYNAMO_TABLE_NAME]}")
+        private String dynamoTableName;
+
+        @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_AWS_REGION]}")
+        private String dynamoRegion;
+
+        @Bean
+        public AmazonDynamoDB dynamoClient() {
+            AmazonDynamoDB dynamoClient = new AmazonDynamoDBClient();
+            dynamoClient.setRegion(Region.getRegion(Regions.fromName(dynamoRegion)));
+            return dynamoClient;
+        }
+
+        @Bean
+        public DataStore dataStore(AmazonDynamoDB dynamoClient) {
+            DynamoStore dynamoStore = new DynamoStore(dynamoClient, dynamoTableName);
+            return dynamoStore;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java
index 3ae5776..fe8b2b1 100644
--- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java
@@ -6,37 +6,26 @@ package org.nasa.jpl.nexus.ingest.nexussink;
 
 
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.lang.ArrayUtils;
 import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
-import org.springframework.context.annotation.Profile;
 import org.springframework.core.convert.converter.Converter;
 import org.springframework.core.env.Environment;
-import org.springframework.data.cassandra.core.CassandraOperations;
-import org.springframework.data.solr.core.SolrOperations;
-import org.springframework.integration.channel.DirectChannel;
-import org.springframework.integration.channel.interceptor.WireTap;
 import org.springframework.integration.config.EnableIntegration;
 import org.springframework.integration.config.IntegrationConverter;
 import org.springframework.integration.dsl.IntegrationFlow;
 import org.springframework.integration.dsl.IntegrationFlows;
 import org.springframework.integration.dsl.channel.MessageChannels;
-import org.springframework.integration.handler.LoggingHandler;
-import org.springframework.messaging.Message;
 import org.springframework.messaging.MessageChannel;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 import javax.annotation.Resource;
-import java.lang.reflect.Array;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
 
 /**
  * Created by greguska on 3/1/16.
@@ -55,10 +44,10 @@ public class IntegrationConfiguration {
     private static final Integer GROUP_TIMEOUT_MS = 2000;
 
     @Autowired
-    private SolrOperations solr;
+    private MetadataStore metadataStore;
 
     @Autowired
-    private CassandraOperations cassandraTemplate;
+    private DataStore dataStore;
 
     @Bean
     public MessageChannel input() {
@@ -91,7 +80,7 @@ public class IntegrationConfiguration {
 
     @Bean
     public NexusService nexus() {
-        return new NexusService(solr, cassandraTemplate);
+        return new NexusService(metadataStore, dataStore);
     }
 
     @Bean
@@ -110,22 +99,6 @@ public class IntegrationConfiguration {
         };
     }
 
-//    @Bean
-//    @IntegrationConverter
-//    public Converter byteObjectArrayToNexusTileConverter() {
-//        return new Converter<Byte[], NexusContent.NexusTile>() {
-//            @Override
-//            public NexusContent.NexusTile convert(Byte[] source) {
-//
-//                try {
-//                    return NexusContent.NexusTile.newBuilder().mergeFrom(ArrayUtils.toPrimitive(source)).build();
-//                } catch (InvalidProtocolBufferException e) {
-//                    throw new RuntimeException("Could not convert message.", e);
-//                }
-//            }
-//        };
-//    }
-
     @Bean
     public TaskScheduler taskScheduler(){
         ThreadPoolTaskScheduler tpts = new ThreadPoolTaskScheduler();
@@ -133,4 +106,5 @@ public class IntegrationConfiguration {
         return tpts;
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java
new file mode 100644
index 0000000..31d5290
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java
@@ -0,0 +1,17 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+
+import java.util.Collection;
+
+/**
+ * Created by djsilvan on 6/26/17.
+ */
+public interface MetadataStore {
+
+    public void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles);
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java
index 7ef3a30..878941d 100644
--- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java
@@ -28,6 +28,15 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{
 
     public static final String PROPERTY_NAME_INSERT_BUFFER = "insertBuffer";
 
+    public static final String PROPERTY_NAME_S3_BUCKET = "s3BucketName";
+    public static final String PROPERTY_NAME_AWS_REGION = "awsRegion";
+    public static final String PROPERTY_NAME_DYNAMO_TABLE_NAME = "dynamoTableName";
+
+    private String s3BucketName = "";
+    private String awsRegion = "";
+    private String dynamoTableName = "";
+    private String dataStore = "";
+
     private String cassandraContactPoints = null;
     private String cassandraKeyspace = null;
     private Integer cassandraPort = 9042;
@@ -42,7 +51,6 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{
      * Cassandra settings
      */
 
-    @NotNull
     public String getCassandraContactPoints(){
         return this.cassandraContactPoints;
     }
@@ -52,7 +60,6 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{
         this.cassandraContactPoints = cassandraContactPoints;
     }
 
-    @NotNull
     public String getCassandraKeyspace(){
         return this.cassandraKeyspace;
     }
@@ -114,22 +121,105 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{
     @ModuleOption(value = "number of messages to buffer before inserting into Nexus", defaultValue = "0")
     public void setInsertBuffer(Integer insertBuffer){ this.insertBuffer = insertBuffer; }
 
+    @ModuleOption(value = "The name of the S3 bucket", defaultValue = "nexus-jpl")
+    public void setS3BucketName(String s3BucketName) {
+        this.s3BucketName = s3BucketName;
+    }
+
+    public String getS3BucketName() {
+        return this.s3BucketName;
+    }
+
+    @ModuleOption(value = "The AWS region", defaultValue = "us-west-2")
+    public void setAwsRegion(String awsRegion) {
+        this.awsRegion = awsRegion;
+    }
+
+    public String getAwsRegion() {
+        return this.awsRegion;
+    }
+
+    @ModuleOption(value = "The name of the dynamoDB table", defaultValue = "nexus-jpl-table")
+    public void setDynamoTableName(String dynamoTableName) {
+        this.dynamoTableName = dynamoTableName;
+    }
+
+    public String getDynamoTableName() {
+        return this.dynamoTableName;
+    }
+
+    @AssertTrue(message = "Either "+PROPERTY_NAME_CASSANDRA_KEYSPACE+", "+PROPERTY_NAME_S3_BUCKET+", or "
+            +PROPERTY_NAME_DYNAMO_TABLE_NAME+" is allowed but not more than 1.")
+    public boolean isOptionMutuallyExclusiveDataStore() {
+        return Exclusives.atMostOneOf(StringUtils.isNotEmpty(getCassandraKeyspace()), StringUtils.isNotEmpty(getS3BucketName()),
+                StringUtils.isNotEmpty(getDynamoTableName()));
+    }
+
+    @AssertTrue(message = "Both "+PROPERTY_NAME_CASSANDRA_KEYSPACE+" and "+PROPERTY_NAME_CASSANDRA_CONTACT_POINTS+
+            " are required if using Cassandra.")
+    public boolean isCassandraConfigured() {
+        if (StringUtils.isEmpty(getCassandraKeyspace())) {
+            return true; //If Cassandra isn't used, return true to avoid test failures
+        }
+
+        return StringUtils.isNotEmpty(getCassandraContactPoints());
+    }
+
+    @AssertTrue(message = "Both "+PROPERTY_NAME_S3_BUCKET+" and "+PROPERTY_NAME_AWS_REGION+" are required if using S3.")
+    public boolean isS3Configured() {
+        if (StringUtils.isEmpty(getS3BucketName())) {
+            return true; //If S3 isn't used, return true to avoid test failures
+        }
+
+        return StringUtils.isNotEmpty(getAwsRegion());
+    }
+
+    @AssertTrue(message = "Both "+PROPERTY_NAME_DYNAMO_TABLE_NAME+" and "+PROPERTY_NAME_AWS_REGION+" are required if using DynamoDB.")
+    public boolean isDynamoConfigured() {
+        if (StringUtils.isEmpty(getDynamoTableName())) {
+            return true; //If DynamoDB isn't used, return true to avoid test failures
+        }
+
+        return StringUtils.isNotEmpty(getAwsRegion());
+    }
 
     @AssertTrue(message = "Either "+PROPERTY_NAME_SOLR_SERVER_URL+" or "+PROPERTY_NAME_SOLR_CLOUD_ZK_URL+" is allowed but not both.")
-    public boolean isOptionMutuallyExclusive(){
+    public boolean isOptionMutuallyExclusiveMetadataStore() {
         return Exclusives.atMostOneOf(StringUtils.isNotEmpty(getSolrCloudZkHost()), StringUtils.isNotEmpty(getSolrUrl()));
     }
 
     @Override
     public String[] profilesToActivate() {
-        if(StringUtils.isNotEmpty(getSolrCloudZkHost())){
-            return new String[]{"solr-cloud"};
-        }else{
-            if("http://embedded/".equals(getSolrUrl())){
-                return new String[]{"solr-embedded"};
-            }else {
-                return new String[]{"solr-standalone"};
+        String[] profiles = new String[2];
+
+        if (StringUtils.isNotEmpty(getSolrCloudZkHost())) {
+            profiles[0] = "solr-cloud";
+        }
+        else {
+            if ("http://embedded/".equals(getSolrUrl())) {
+                profiles[0] = "solr-embedded";
             }
+            else {
+                profiles[0] = "solr-standalone";
+            }
+        }
+        if (StringUtils.isNotEmpty(getCassandraKeyspace())) {
+            profiles[1] = "cassandra";
         }
+        else {
+            if (StringUtils.isNotEmpty(getS3BucketName())) {
+                if (StringUtils.isNotEmpty(getAwsRegion())) {
+                    profiles[1] = "s3";
+                }
+                else {
+                    profiles[1] = "s3local";
+                }
+            }
+            else {
+                profiles[1] = "dynamo";
+            }
+        }
+
+        return profiles;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java
----------------------------------------------------------------------
diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java
new file mode 100644
index 0000000..efdc670
--- /dev/null
+++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java
@@ -0,0 +1,68 @@
+/*****************************************************************************
+ * Copyright (c) 2017 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+package org.nasa.jpl.nexus.ingest.nexussink;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.*;
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Collection;
+
+/**
+ * Created by djsilvan on 6/26/17.
+ */
+public class S3Store implements DataStore {
+
+    private AmazonS3 s3;
+    private String bucketName;
+    private Logger log = LoggerFactory.getLogger(NexusService.class);
+
+    public S3Store(AmazonS3Client s3client, String bucketName) {
+        s3 = s3client;
+        this.bucketName = bucketName;
+    }
+
+    public void saveData(Collection<NexusTile> nexusTiles) {
+
+        for (NexusTile tile : nexusTiles) {
+            String tileId = getTileId(tile);
+            byte[] tileData = getTileData(tile);
+            Long contentLength = (long) tileData.length;
+            InputStream stream = new ByteArrayInputStream(tileData);
+            ObjectMetadata meta = new ObjectMetadata();
+            meta.setContentLength(contentLength);
+
+            try {
+                s3.putObject(new PutObjectRequest(bucketName, tileId, stream, meta));
+            }
+            catch (AmazonServiceException ase) {
+                log.error("Caught an AmazonServiceException, which means your request made it "
+                        + "to Amazon S3, but was rejected with an error response for some reason.");
+                throw new DataStoreException(ase);
+            }
+            catch (AmazonClientException ace) {
+                log.error("Caught an AmazonClientException, which means the client encountered "
+                        + "a serious internal problem while trying to communicate with S3, "
+                        + "such as not being able to access the network.");
+                throw new DataStoreException(ace);
+            }
+        }
+    }
+
+    private String getTileId(NexusTile tile) {
+        return tile.getTile().getTileId();
+    }
+
+    private byte[] getTileData(NexusTile tile) {
+        return tile.getTile().toByteArray();
+    }
+}



Mime
View raw message