From commits-return-873-archive-asf-public=cust-asf.ponee.io@sdap.apache.org Tue Aug 11 01:59:28 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mailroute1-lw-us.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 6B715180670 for ; Tue, 11 Aug 2020 03:59:26 +0200 (CEST) Received: from mail.apache.org (localhost [127.0.0.1]) by mailroute1-lw-us.apache.org (ASF Mail Server at mailroute1-lw-us.apache.org) with SMTP id 55C281248F5 for ; Tue, 11 Aug 2020 01:59:25 +0000 (UTC) Received: (qmail 57130 invoked by uid 500); 11 Aug 2020 01:59:25 -0000 Mailing-List: contact commits-help@sdap.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sdap.apache.org Delivered-To: mailing list commits@sdap.apache.org Received: (qmail 56765 invoked by uid 99); 11 Aug 2020 01:59:24 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2020 01:59:24 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0908282436; Tue, 11 Aug 2020 01:59:24 +0000 (UTC) Date: Tue, 11 Aug 2020 01:59:34 +0000 To: "commits@sdap.apache.org" Subject: [incubator-sdap-nexus] 11/28: read cli args for cass and solr hosts MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: tloubrieu@apache.org In-Reply-To: <159711116318.26880.41452379242576996@gitbox.apache.org> References: <159711116318.26880.41452379242576996@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-sdap-nexus X-Git-Refname: refs/heads/SDAP-268 X-Git-Reftype: branch X-Git-Rev: 4e3c24ac39704e37515bdaf5927e0ef598593e2a X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200811015924.0908282436@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch SDAP-268 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git commit 4e3c24ac39704e37515bdaf5927e0ef598593e2a Author: Eamon Ford AuthorDate: Wed Jul 22 16:10:53 2020 -0700 read cli args for cass and solr hosts --- .gitignore | 1 - analysis/setup.py | 3 +- analysis/tests/algorithms_spark/Matchup_test.py | 321 ---------- analysis/tests/algorithms_spark/__init__.py | 16 - analysis/webservice/algorithms/Capabilities.py | 3 - analysis/webservice/algorithms/CorrelationMap.py | 3 - .../algorithms/DailyDifferenceAverage.py | 3 - .../webservice/algorithms/DataInBoundsSearch.py | 6 - analysis/webservice/algorithms/DataSeriesList.py | 7 +- analysis/webservice/algorithms/DelayTest.py | 3 - analysis/webservice/algorithms/ErrorTosserTest.py | 3 - analysis/webservice/algorithms/Heartbeat.py | 3 - analysis/webservice/algorithms/HofMoeller.py | 10 +- .../webservice/algorithms/LongitudeLatitudeMap.py | 6 - .../algorithms/StandardDeviationSearch.py | 5 - analysis/webservice/algorithms/TileSearch.py | 3 - analysis/webservice/algorithms/TimeAvgMap.py | 3 - analysis/webservice/algorithms/TimeSeries.py | 22 +- analysis/webservice/algorithms/TimeSeriesSolr.py | 7 +- .../webservice/algorithms/doms/BaseDomsHandler.py | 635 ------------------- .../webservice/algorithms/doms/DatasetListQuery.py | 116 ---- .../algorithms/doms/DomsInitialization.py | 164 ----- .../webservice/algorithms/doms/MatchupQuery.py | 452 ------------- .../webservice/algorithms/doms/MetadataQuery.py | 65 -- .../webservice/algorithms/doms/ResultsPlotQuery.py | 55 -- .../webservice/algorithms/doms/ResultsRetrieval.py | 49 -- .../webservice/algorithms/doms/ResultsStorage.py | 286 --------- analysis/webservice/algorithms/doms/StatsQuery.py | 63 -- analysis/webservice/algorithms/doms/ValuesQuery.py | 72 --- analysis/webservice/algorithms/doms/__init__.py | 34 - analysis/webservice/algorithms/doms/config.py | 109 ---- analysis/webservice/algorithms/doms/datafetch.py | 47 -- .../algorithms/doms/domsconfig.ini.default | 15 - .../webservice/algorithms/doms/fetchedgeimpl.py | 217 ------- analysis/webservice/algorithms/doms/geo.py | 129 ---- .../webservice/algorithms/doms/histogramplot.py | 127 ---- .../webservice/algorithms/doms/insitusubset.py | 263 -------- analysis/webservice/algorithms/doms/mapplot.py | 175 ----- analysis/webservice/algorithms/doms/scatterplot.py | 118 ---- analysis/webservice/algorithms/doms/subsetter.py | 260 -------- analysis/webservice/algorithms/doms/values.py | 72 --- .../webservice/algorithms/doms/workerthread.py | 61 -- analysis/webservice/algorithms_spark/Matchup.py | 703 --------------------- .../algorithms_spark/NexusCalcSparkHandler.py | 1 + analysis/webservice/algorithms_spark/__init__.py | 6 - analysis/webservice/config/web.ini | 2 +- analysis/webservice/webapp.py | 3 +- .../nexustiles/config/datastores.ini.default | 4 +- data-access/nexustiles/nexustiles.py | 6 +- helm/templates/webapp.yml | 2 +- tools/doms/README.md | 66 -- tools/doms/doms_reader.py | 144 ----- 52 files changed, 31 insertions(+), 4918 deletions(-) diff --git a/.gitignore b/.gitignore index 3e29626..4e4cf6e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,5 @@ *.code-workspace *.idea *.DS_Store -analysis/webservice/algorithms/doms/domsconfig.ini data-access/nexustiles/config/datastores.ini venv/ diff --git a/analysis/setup.py b/analysis/setup.py index 62a6891..9a449ce 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -50,8 +50,7 @@ setuptools.setup( # 'webservice.nexus_tornado.request.renderers' #], package_data={ - 'webservice': ['config/web.ini', 'config/algorithms.ini'], - 'webservice.algorithms.doms': ['domsconfig.ini.default'] + 'webservice': ['config/web.ini', 'config/algorithms.ini'] }, data_files=[ ('static', ['static/index.html']) diff --git a/analysis/tests/algorithms_spark/Matchup_test.py b/analysis/tests/algorithms_spark/Matchup_test.py deleted file mode 100644 index 5dee17c..0000000 --- a/analysis/tests/algorithms_spark/Matchup_test.py +++ /dev/null @@ -1,321 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pickle -import random -import timeit -import unittest - -from webservice.algorithms_spark.Matchup import * - - -class TestMatch_Points(unittest.TestCase): - def test_one_point_match_exact(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=2) - - primary_points = [primary] - matchup_points = [matchup] - - matches = list(match_points_generator(primary_points, matchup_points, 0)) - - self.assertEquals(1, len(matches)) - - p_match_point, match = matches[0] - - self.assertEqual(primary, p_match_point) - self.assertEqual(matchup, match) - - def test_one_point_match_within_tolerance_150km(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, data_id=2) - - primary_points = [primary] - matchup_points = [matchup] - - matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km - - self.assertEquals(1, len(matches)) - - p_match_point, match = matches[0] - - self.assertEqual(primary, p_match_point) - self.assertEqual(matchup, match) - - def test_one_point_match_within_tolerance_200m(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2) - - primary_points = [primary] - matchup_points = [matchup] - - matches = list(match_points_generator(primary_points, matchup_points, 200)) # tolerance 200 m - - self.assertEquals(1, len(matches)) - - p_match_point, match = matches[0] - - self.assertEqual(primary, p_match_point) - self.assertEqual(matchup, match) - - def test_one_point_not_match_tolerance_150km(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, data_id=2) - - primary_points = [primary] - matchup_points = [matchup] - - matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km - - self.assertEquals(0, len(matches)) - - def test_one_point_not_match_tolerance_100m(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2) - - primary_points = [primary] - matchup_points = [matchup] - - matches = list(match_points_generator(primary_points, matchup_points, 100)) # tolerance 100 m - - self.assertEquals(0, len(matches)) - - def test_multiple_point_match(self): - primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1) - primary_points = [primary] - - matchup_points = [ - DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=2), - DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=3), - DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=4) - ] - - matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km - - self.assertEquals(3, len(matches)) - - self.assertSetEqual({primary}, {x[0] for x in matches}) - - list_of_matches = [x[1] for x in matches] - - self.assertEquals(3, len(list_of_matches)) - self.assertItemsEqual(matchup_points, list_of_matches) - - def test_multiple_point_match_multiple_times(self): - primary_points = [ - DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1), - DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, data_id=2) - ] - - matchup_points = [ - DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=3), - DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=4), - DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=5) - ] - - matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km - - self.assertEquals(5, len(matches)) - - self.assertSetEqual({p for p in primary_points}, {x[0] for x in matches}) - - # First primary point matches all 3 secondary - self.assertEquals(3, [x[0] for x in matches].count(primary_points[0])) - self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] == primary_points[0]]) - - # Second primary point matches only last 2 secondary - self.assertEquals(2, [x[0] for x in matches].count(primary_points[1])) - self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if x[0] == primary_points[1]]) - - def test_one_of_many_primary_matches_one_of_many_matchup(self): - primary_points = [ - DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, data_id=1), - DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, data_id=2) - ] - - matchup_points = [ - DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, depth=3.973, data_id=3), - DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, depth=2.9798, data_id=4), - DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, depth=4.07, data_id=5) - ] - - matches = list(match_points_generator(primary_points, matchup_points, 110000)) # tolerance 110 km - - self.assertEquals(1, len(matches)) - - self.assertSetEqual({p for p in primary_points if p.data_id == 2}, {x[0] for x in matches}) - - # First primary point matches none - self.assertEquals(0, [x[0] for x in matches].count(primary_points[0])) - - # Second primary point matches only first secondary - self.assertEquals(1, [x[0] for x in matches].count(primary_points[1])) - self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if x[0] == primary_points[1]]) - - @unittest.skip("This test is just for timing, doesn't actually assert anything.") - def test_time_many_primary_many_matchup(self): - import logging - import sys - logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout) - log = logging.getLogger(__name__) - # Generate 160000 DomsPoints distributed equally in a box from -2.0 lat/lon to 2.0 lat/lon - log.info("Generating primary points") - x = np.arange(-2.0, 2.0, 0.01) - y = np.arange(-2.0, 2.0, 0.01) - primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], time=1000, depth=5.0, data_id=i) for i, xy in - enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 2))] - - # Generate 2000 DomsPoints distributed randomly in a box from -2.0 lat/lon to 2.0 lat/lon - log.info("Generating matchup points") - matchup_points = [ - DomsPoint(longitude=random.uniform(-2.0, 2.0), latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0, - data_id=i) for i in xrange(0, 2000)] - - log.info("Starting matchup") - log.info("Best of repeat(3, 2) matchups: %s seconds" % min( - timeit.repeat(lambda: list(match_points_generator(primary_points, matchup_points, 1500)), repeat=3, - number=2))) - - -class TestDOMSPoint(unittest.TestCase): - def test_is_pickleable(self): - edge_point = json.loads("""{ -"id": "argo-profiles-5903995(46, 0)", -"time": "2012-10-15T14:24:04Z", -"point": "-33.467 29.728", -"sea_water_temperature": 24.5629997253, -"sea_water_temperature_depth": 2.9796258642, -"wind_speed": null, -"sea_water_salinity": null, -"sea_water_salinity_depth": null, -"platform": 4, -"device": 3, -"fileurl": "ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc" -}""") - point = DomsPoint.from_edge_point(edge_point) - self.assertIsNotNone(pickle.dumps(point)) - - -def check_all(): - return check_solr() and check_cass() and check_edge() - - -def check_solr(): - # TODO eventually this might do something. - return False - - -def check_cass(): - # TODO eventually this might do something. - return False - - -def check_edge(): - # TODO eventually this might do something. - return False - - -@unittest.skipUnless(check_all(), - "These tests require local instances of Solr, Cassandra, and Edge to be running.") -class TestMatchup(unittest.TestCase): - def setUp(self): - from os import environ - environ['PYSPARK_DRIVER_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7' - environ['PYSPARK_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7' - environ['SPARK_HOME'] = '/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7' - - def test_mur_match(self): - from shapely.wkt import loads - from nexustiles.nexustiles import NexusTileService - - polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") - primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" - matchup_ds = "spurs" - parameter = "sst" - start_time = 1350259200 # 2012-10-15T00:00:00Z - end_time = 1350345600 # 2012-10-16T00:00:00Z - time_tolerance = 86400 - depth_tolerance = 5.0 - radius_tolerance = 1500.0 - platforms = "1,2,3,4,5,6,7,8,9" - - tile_service = NexusTileService() - tile_ids = [tile.tile_id for tile in - tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, - fl='id')] - result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, - depth_tolerance, radius_tolerance, platforms) - for k, v in result.iteritems(): - print "primary: %s\n\tmatches:\n\t\t%s" % ( - "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst), - '\n\t\t'.join( - ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v])) - - def test_smap_match(self): - from shapely.wkt import loads - from nexustiles.nexustiles import NexusTileService - - polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") - primary_ds = "SMAP_L2B_SSS" - matchup_ds = "spurs" - parameter = "sss" - start_time = 1350259200 # 2012-10-15T00:00:00Z - end_time = 1350345600 # 2012-10-16T00:00:00Z - time_tolerance = 86400 - depth_tolerance = 5.0 - radius_tolerance = 1500.0 - platforms = "1,2,3,4,5,6,7,8,9" - - tile_service = NexusTileService() - tile_ids = [tile.tile_id for tile in - tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, - fl='id')] - result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, - depth_tolerance, radius_tolerance, platforms) - for k, v in result.iteritems(): - print "primary: %s\n\tmatches:\n\t\t%s" % ( - "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst), - '\n\t\t'.join( - ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v])) - - def test_ascatb_match(self): - from shapely.wkt import loads - from nexustiles.nexustiles import NexusTileService - - polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))") - primary_ds = "ASCATB-L2-Coastal" - matchup_ds = "spurs" - parameter = "wind" - start_time = 1351468800 # 2012-10-29T00:00:00Z - end_time = 1351555200 # 2012-10-30T00:00:00Z - time_tolerance = 86400 - depth_tolerance = 5.0 - radius_tolerance = 110000.0 # 110 km - platforms = "1,2,3,4,5,6,7,8,9" - - tile_service = NexusTileService() - tile_ids = [tile.tile_id for tile in - tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False, - fl='id')] - result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance, - depth_tolerance, radius_tolerance, platforms) - for k, v in result.iteritems(): - print "primary: %s\n\tmatches:\n\t\t%s" % ( - "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, k.latitude, k.time, k.wind_u, k.wind_v), - '\n\t\t'.join( - ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % ( - i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) for i in v])) diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py deleted file mode 100644 index 0707368..0000000 --- a/analysis/tests/algorithms_spark/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py index f507587..fa85a7c 100644 --- a/analysis/webservice/algorithms/Capabilities.py +++ b/analysis/webservice/algorithms/Capabilities.py @@ -29,9 +29,6 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): capabilities = [] diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py index 1726412..1d8a0ad 100644 --- a/analysis/webservice/algorithms/CorrelationMap.py +++ b/analysis/webservice/algorithms/CorrelationMap.py @@ -41,9 +41,6 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py index 1b4d642..0ffd83b 100644 --- a/analysis/webservice/algorithms/DailyDifferenceAverage.py +++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py @@ -80,9 +80,6 @@ class DailyDifferenceAverageImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, request, **args): min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon() dataset1 = request.get_argument("ds1", None) diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py index 2da6891..fa69416 100644 --- a/analysis/webservice/algorithms/DataInBoundsSearch.py +++ b/analysis/webservice/algorithms/DataInBoundsSearch.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging from datetime import datetime from pytz import timezone @@ -67,13 +66,8 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py index 16736b2..e9275ed 100644 --- a/analysis/webservice/algorithms/DataSeriesList.py +++ b/analysis/webservice/algorithms/DataSeriesList.py @@ -20,6 +20,10 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.NexusHandler import nexus_handler from webservice.webmodel import cached +import logging + + +logger = logging.getLogger(__name__) @nexus_handler class DataSeriesListCalcHandlerImpl(NexusCalcHandler): @@ -28,9 +32,6 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler): description = "Lists datasets currently available for analysis" params = {} - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - @cached(ttl=(60 * 60 * 1000)) # 1 hour cached def calc(self, computeOptions, **args): class SimpleResult(object): diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py index e2c1b30..de56f56 100644 --- a/analysis/webservice/algorithms/DelayTest.py +++ b/analysis/webservice/algorithms/DelayTest.py @@ -28,9 +28,6 @@ class DelayCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): time.sleep(10) diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py index dc4d617..0100552 100644 --- a/analysis/webservice/algorithms/ErrorTosserTest.py +++ b/analysis/webservice/algorithms/ErrorTosserTest.py @@ -26,9 +26,6 @@ class ErrorTosserCalcHandler(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): a = 100 / 0.0 # raise Exception("I'm Mad!") diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py index ae7fcee..bc1f50f 100644 --- a/analysis/webservice/algorithms/Heartbeat.py +++ b/analysis/webservice/algorithms/Heartbeat.py @@ -28,9 +28,6 @@ class HeartbeatCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): solrOnline = self._get_tile_service().pingSolr() diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py index 563ea3d..60252ab 100644 --- a/analysis/webservice/algorithms/HofMoeller.py +++ b/analysis/webservice/algorithms/HofMoeller.py @@ -39,6 +39,9 @@ LONGITUDE = 1 if not matplotlib.get_backend(): matplotlib.use('Agg') +logger = logging.getLogger(__name__) + + class LongitudeHofMoellerCalculator(object): def longitude_time_hofmoeller_stats(self, tile, index): stat = { @@ -93,9 +96,6 @@ class LatitudeHofMoellerCalculator(object): class BaseHoffMoellerCalcHandlerImpl(NexusCalcHandler): - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True): shape = (len(results), len(results[0][pivot])) @@ -168,7 +168,7 @@ class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.") except KeyError: pass @@ -234,7 +234,7 @@ class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.") except KeyError: pass diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py index 3f0467a..031d893 100644 --- a/analysis/webservice/algorithms/LongitudeLatitudeMap.py +++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging import math from datetime import datetime @@ -74,13 +73,8 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py index 231c687..1975d2d 100644 --- a/analysis/webservice/algorithms/StandardDeviationSearch.py +++ b/analysis/webservice/algorithms/StandardDeviationSearch.py @@ -73,13 +73,8 @@ class StandardDeviationSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py index a3758bc..321d94f 100644 --- a/analysis/webservice/algorithms/TileSearch.py +++ b/analysis/webservice/algorithms/TileSearch.py @@ -62,9 +62,6 @@ class ChunkSearchCalcHandlerImpl(NexusCalcHandler): } } - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py index 3a609c5..93a9a00 100644 --- a/analysis/webservice/algorithms/TimeAvgMap.py +++ b/analysis/webservice/algorithms/TimeAvgMap.py @@ -37,9 +37,6 @@ class TimeAvgMapCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=False) - def _find_native_resolution(self): # Get a quick set of tiles (1 degree at center of box) at 1 time stamp midLat = (self._minLat + self._maxLat) / 2 diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py index 85613d9..b1d0675 100644 --- a/analysis/webservice/algorithms/TimeSeries.py +++ b/analysis/webservice/algorithms/TimeSeries.py @@ -41,6 +41,7 @@ SENTINEL = 'STOP' EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -84,13 +85,8 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset() @@ -185,7 +181,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception: stats = {} tb = traceback.format_exc() - self.log.warn("Error when calculating comparison stats:\n%s" % tb) + logger.warn("Error when calculating comparison stats:\n%s" % tb) else: stats = {} @@ -199,7 +195,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch, endTime=end_seconds_from_epoch) - self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) + logger.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) return res def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch, @@ -214,7 +210,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): ds, start_seconds_from_epoch, end_seconds_from_epoch) - self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if len(daysinrange) == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -248,7 +244,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass @@ -259,7 +255,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): manager.shutdown() results = sorted(results, key=lambda entry: entry["time"]) - self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if apply_seasonal_cycle_filter: the_time = datetime.now() @@ -272,7 +268,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result['meanSeasonal'] = seasonal_mean result['minSeasonal'] = seasonal_min result['maxSeasonal'] = seasonal_max - self.log.info( + logger.info( "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) the_time = datetime.now() @@ -291,9 +287,9 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception as e: # If it doesn't work log the error but ignore it tb = traceback.format_exc() - self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) - self.log.info( + logger.info( "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) return results, {} diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py index fbe4d43..49d75db 100644 --- a/analysis/webservice/algorithms/TimeSeriesSolr.py +++ b/analysis/webservice/algorithms/TimeSeriesSolr.py @@ -33,6 +33,7 @@ from webservice.webmodel import NexusResults, NexusProcessingException, NoDataEx SENTINEL = 'STOP' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -42,10 +43,6 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def calc(self, computeOptions, **args): """ @@ -133,7 +130,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py deleted file mode 100644 index d07f929..0000000 --- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py +++ /dev/null @@ -1,635 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import StringIO -import os -import csv -import json -from datetime import datetime -import time -from decimal import Decimal - -import numpy as np -from pytz import timezone, UTC - -import config -import geo -from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler -from webservice.webmodel import NexusResults - -EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) -ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' - -try: - from osgeo import gdal - from osgeo.gdalnumeric import * -except ImportError: - import gdal - from gdalnumeric import * - -from netCDF4 import Dataset -import netCDF4 -import tempfile - - -class BaseDomsQueryCalcHandler(BaseHandler): - def __init__(self): - BaseHandler.__init__(self) - - def getDataSourceByName(self, source): - for s in config.ENDPOINTS: - if s["name"] == source: - return s - return None - - def _does_datasource_exist(self, ds): - for endpoint in config.ENDPOINTS: - if endpoint["name"] == ds: - return True - return False - - -class DomsEncoder(json.JSONEncoder): - def __init__(self, **args): - json.JSONEncoder.__init__(self, **args) - - def default(self, obj): - # print 'MyEncoder.default() called' - # print type(obj) - if obj == np.nan: - return None # hard code string for now - elif isinstance(obj, datetime): - return long((obj - EPOCH).total_seconds()) - elif isinstance(obj, Decimal): - return str(obj) - else: - return json.JSONEncoder.default(self, obj) - - -class DomsQueryResults(NexusResults): - def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None, - executionId=None, status_code=200): - NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions, - status_code=status_code) - self.__args = args - self.__bounds = bounds - self.__count = count - self.__details = details - self.__executionId = str(executionId) - - def toJson(self): - bounds = self.__bounds.toMap() if self.__bounds is not None else {} - return json.dumps( - {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds, - "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder) - - def toCSV(self): - return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details) - - def toNetCDF(self): - return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details) - - -class DomsCSVFormatter: - @staticmethod - def create(executionId, results, params, details): - - csv_mem_file = StringIO.StringIO() - try: - DomsCSVFormatter.__addConstants(csv_mem_file) - DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details) - csv.writer(csv_mem_file).writerow([]) - - DomsCSVFormatter.__packValues(csv_mem_file, results, params) - - csv_out = csv_mem_file.getvalue() - finally: - csv_mem_file.close() - - return csv_out - - @staticmethod - def __packValues(csv_mem_file, results, params): - - writer = csv.writer(csv_mem_file) - - headers = [ - # Primary - "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform", - "sea_surface_salinity (1e-3)", "sea_surface_temperature (degree_C)", "wind_speed (m s-1)", "wind_direction", - "wind_u (m s-1)", "wind_v (m s-1)", - # Match - "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform", - "depth (m)", "sea_water_salinity (1e-3)", - "sea_water_temperature (degree_C)", "wind_speed (m s-1)", - "wind_direction", "wind_u (m s-1)", "wind_v (m s-1)" - ] - - writer.writerow(headers) - - # - # Only include the depth variable related to the match-up parameter. If the match-up parameter - # is not sss or sst then do not include any depth data, just fill values. - # - if params["parameter"] == "sss": - depth = "sea_water_salinity_depth" - elif params["parameter"] == "sst": - depth = "sea_water_temperature_depth" - else: - depth = "NO_DEPTH" - - for primaryValue in results: - for matchup in primaryValue["matches"]: - row = [ - # Primary - primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]), - primaryValue["time"].strftime(ISO_8601), primaryValue["platform"], - primaryValue.get("sea_water_salinity", ""), primaryValue.get("sea_water_temperature", ""), - primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""), - primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""), - - # Matchup - matchup["id"], matchup["source"], matchup["x"], matchup["y"], - matchup["time"].strftime(ISO_8601), matchup["platform"], - matchup.get(depth, ""), matchup.get("sea_water_salinity", ""), - matchup.get("sea_water_temperature", ""), - matchup.get("wind_speed", ""), matchup.get("wind_direction", ""), - matchup.get("wind_u", ""), matchup.get("wind_v", ""), - ] - writer.writerow(row) - - @staticmethod - def __addConstants(csvfile): - - global_attrs = [ - {"Global Attribute": "product_version", "Value": "1.0"}, - {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"}, - {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"}, - {"Global Attribute": "history", - "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"}, - {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"}, - {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"}, - {"Global Attribute": "standard_name_vocabulary", - "Value": "CF Standard Name Table v27, BODC controlled vocabulary"}, - {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"}, - {"Global Attribute": "processing_level", "Value": "4"}, - {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"}, - {"Global Attribute": "keywords_vocabulary", - "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"}, - # TODO What should the keywords be? - {"Global Attribute": "keywords", "Value": "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, " - "NASA/JPL/PODAAC, FSU/COAPS, UCAR/NCAR, SALINITY, " - "SEA SURFACE TEMPERATURE, SURFACE WINDS"}, - {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"}, - {"Global Attribute": "creator_email", "Value": "podaac@podaac.jpl.nasa.gov"}, - {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"}, - {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"}, - {"Global Attribute": "publisher_email", "Value": "podaac@podaac.jpl.nasa.gov"}, - {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"}, - {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."}, - ] - - writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) - - writer.writerows(global_attrs) - - @staticmethod - def __addDynamicAttrs(csvfile, executionId, results, params, details): - - platforms = set() - for primaryValue in results: - platforms.add(primaryValue['platform']) - for match in primaryValue['matches']: - platforms.add(match['platform']) - - # insituDatasets = params["matchup"].split(",") - insituDatasets = params["matchup"] - insituLinks = set() - for insitu in insituDatasets: - insituLinks.add(config.METADATA_LINKS[insitu]) - - - global_attrs = [ - {"Global Attribute": "Platform", "Value": ', '.join(platforms)}, - {"Global Attribute": "time_coverage_start", - "Value": params["startTime"].strftime(ISO_8601)}, - {"Global Attribute": "time_coverage_end", - "Value": params["endTime"].strftime(ISO_8601)}, - {"Global Attribute": "time_coverage_resolution", "Value": "point"}, - - {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]}, - {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]}, - {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]}, - {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]}, - {"Global Attribute": "geospatial_lat_resolution", "Value": "point"}, - {"Global Attribute": "geospatial_lon_resolution", "Value": "point"}, - {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"}, - {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"}, - - {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]}, - {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]}, - {"Global Attribute": "geospatial_vertical_units", "Value": "m"}, - {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"}, - {"Global Attribute": "geospatial_vertical_positive", "Value": "down"}, - - {"Global Attribute": "DOMS_matchID", "Value": executionId}, - {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60}, - {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"}, - - {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]}, - {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]}, - {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"}, - - {"Global Attribute": "DOMS_DatasetMetadata", "Value": ', '.join(insituLinks)}, - {"Global Attribute": "DOMS_primary", "Value": params["primary"]}, - {"Global Attribute": "DOMS_match_up", "Value": params["matchup"]}, - {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")}, - - {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]}, - {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"}, - {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]}, - {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]}, - - {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, - {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, - - {"Global Attribute": "URI_Matchup", "Value": "http://{webservice}/domsresults?id=" + executionId + "&output=CSV"}, - ] - - writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) - - writer.writerows(global_attrs) - - -class DomsNetCDFFormatter: - @staticmethod - def create(executionId, results, params, details): - - t = tempfile.mkstemp(prefix="doms_", suffix=".nc") - tempFileName = t[1] - - dataset = Dataset(tempFileName, "w", format="NETCDF4") - dataset.DOMS_matchID = executionId - DomsNetCDFFormatter.__addNetCDFConstants(dataset) - - dataset.date_modified = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601) - dataset.date_created = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601) - dataset.time_coverage_start = params["startTime"].strftime(ISO_8601) - dataset.time_coverage_end = params["endTime"].strftime(ISO_8601) - dataset.time_coverage_resolution = "point" - dataset.DOMS_match_up = params["matchup"] - dataset.DOMS_num_matchup_matched = details["numInSituMatched"] - dataset.DOMS_num_primary_matched = details["numGriddedMatched"] - - bbox = geo.BoundingBox(asString=params["bbox"]) - dataset.geospatial_lat_max = bbox.north - dataset.geospatial_lat_min = bbox.south - dataset.geospatial_lon_max = bbox.east - dataset.geospatial_lon_min = bbox.west - dataset.geospatial_lat_resolution = "point" - dataset.geospatial_lon_resolution = "point" - dataset.geospatial_lat_units = "degrees_north" - dataset.geospatial_lon_units = "degrees_east" - dataset.geospatial_vertical_min = float(params["depthMin"]) - dataset.geospatial_vertical_max = float(params["depthMax"]) - dataset.geospatial_vertical_units = "m" - dataset.geospatial_vertical_resolution = "point" - dataset.geospatial_vertical_positive = "down" - - dataset.DOMS_TimeWindow = params["timeTolerance"] / 60 / 60 - dataset.DOMS_TimeWindow_Units = "hours" - dataset.DOMS_SearchRadius = float(params["radiusTolerance"]) - dataset.DOMS_SearchRadius_Units = "m" - # dataset.URI_Subset = "http://webservice subsetting query request" - dataset.URI_Matchup = "http://{webservice}/domsresults?id=" + executionId + "&output=NETCDF" - dataset.DOMS_ParameterPrimary = params["parameter"] if "parameter" in params else "" - dataset.DOMS_platforms = params["platforms"] - dataset.DOMS_primary = params["primary"] - dataset.DOMS_time_to_complete = details["timeToComplete"] - dataset.DOMS_time_to_complete_units = "seconds" - - insituDatasets = params["matchup"] - insituLinks = set() - for insitu in insituDatasets: - insituLinks.add(config.METADATA_LINKS[insitu]) - dataset.DOMS_DatasetMetadata = ', '.join(insituLinks) - - platforms = set() - for primaryValue in results: - platforms.add(primaryValue['platform']) - for match in primaryValue['matches']: - platforms.add(match['platform']) - dataset.platform = ', '.join(platforms) - - satellite_group_name = "SatelliteData" - insitu_group_name = "InsituData" - - #Create Satellite group, variables, and attributes - satelliteGroup = dataset.createGroup(satellite_group_name) - satelliteWriter = DomsNetCDFValueWriter(satelliteGroup, params["parameter"]) - - # Create InSitu group, variables, and attributes - insituGroup = dataset.createGroup(insitu_group_name) - insituWriter = DomsNetCDFValueWriter(insituGroup, params["parameter"]) - - # Add data to Insitu and Satellite groups, generate array of match ID pairs - matches = DomsNetCDFFormatter.__writeResults(results, satelliteWriter, insituWriter) - dataset.createDimension("MatchedRecords", size=None) - dataset.createDimension("MatchedGroups", size=2) - matchArray = dataset.createVariable("matchIDs", "f4", ("MatchedRecords", "MatchedGroups")) - matchArray[:] = matches - - dataset.close() - f = open(tempFileName, "rb") - data = f.read() - f.close() - os.unlink(tempFileName) - return data - - @staticmethod - def __addNetCDFConstants(dataset): - dataset.product_version = "1.0" - dataset.Conventions = "CF-1.6, ACDD-1.3" - dataset.title = "DOMS satellite-insitu machup output file" - dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03" - dataset.institution = "JPL, FSU, NCAR" - dataset.source = "doms.jpl.nasa.gov" - dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary" - dataset.cdm_data_type = "Point/Profile, Swath/Grid" - dataset.processing_level = "4" - dataset.project = "Distributed Oceanographic Matchup System (DOMS)" - dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords" - dataset.keywords = "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, NASA/JPL/PODAAC, " \ - "FSU/COAPS, UCAR/NCAR, SALINITY, SEA SURFACE TEMPERATURE, SURFACE WINDS" - dataset.creator_name = "NASA PO.DAAC" - dataset.creator_email = "podaac@podaac.jpl.nasa.gov" - dataset.creator_url = "https://podaac.jpl.nasa.gov/" - dataset.publisher_name = "NASA PO.DAAC" - dataset.publisher_email = "podaac@podaac.jpl.nasa.gov" - dataset.publisher_url = "https://podaac.jpl.nasa.gov" - dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N." - - @staticmethod - def __writeResults(results, satelliteWriter, insituWriter): - ids = {} - matches = [] - insituIndex = 0 - - # - # Loop through all of the results, add each satellite data point to the array - # - for r in range(0, len(results)): - result = results[r] - satelliteWriter.addData(result) - - # Add each match only if it is not already in the array of in situ points - for match in result["matches"]: - if match["id"] not in ids: - ids[match["id"]] = insituIndex - insituIndex += 1 - insituWriter.addData(match) - - # Append an index pait of (satellite, in situ) to the array of matches - matches.append((r, ids[match["id"]])) - - # Add data/write to the netCDF file - satelliteWriter.writeGroup() - insituWriter.writeGroup() - - return matches - - -class DomsNetCDFValueWriter: - def __init__(self, group, matchup_parameter): - group.createDimension("dim", size=None) - self.group = group - - self.lat = [] - self.lon = [] - self.time = [] - self.sea_water_salinity = [] - self.wind_speed = [] - self.wind_u = [] - self.wind_v = [] - self.wind_direction = [] - self.sea_water_temperature = [] - self.depth = [] - - self.satellite_group_name = "SatelliteData" - self.insitu_group_name = "InsituData" - - # - # Only include the depth variable related to the match-up parameter. If the match-up parameter is - # not sss or sst then do not include any depth data, just fill values. - # - if matchup_parameter == "sss": - self.matchup_depth = "sea_water_salinity_depth" - elif matchup_parameter == "sst": - self.matchup_depth = "sea_water_temperature_depth" - else: - self.matchup_depth = "NO_DEPTH" - - def addData(self, value): - self.lat.append(value.get("y", None)) - self.lon.append(value.get("x", None)) - self.time.append(time.mktime(value.get("time").timetuple())) - self.sea_water_salinity.append(value.get("sea_water_salinity", None)) - self.wind_speed.append(value.get("wind_speed", None)) - self.wind_u.append(value.get("wind_u", None)) - self.wind_v.append(value.get("wind_v", None)) - self.wind_direction.append(value.get("wind_direction", None)) - self.sea_water_temperature.append(value.get("sea_water_temperature", None)) - self.depth.append(value.get(self.matchup_depth, None)) - - def writeGroup(self): - # - # Create variables, enrich with attributes, and add data - # - lonVar = self.group.createVariable("lon", "f4", ("dim",), fill_value=-32767.0) - latVar = self.group.createVariable("lat", "f4", ("dim",), fill_value=-32767.0) - timeVar = self.group.createVariable("time", "f4", ("dim",), fill_value=-32767.0) - - self.__enrichLon(lonVar, min(self.lon), max(self.lon)) - self.__enrichLat(latVar, min(self.lat), max(self.lat)) - self.__enrichTime(timeVar) - - latVar[:] = self.lat - lonVar[:] = self.lon - timeVar[:] = self.time - - if self.sea_water_salinity.count(None) != len(self.sea_water_salinity): - if self.group.name == self.satellite_group_name: - sssVar = self.group.createVariable("SeaSurfaceSalinity", "f4", ("dim",), fill_value=-32767.0) - self.__enrichSSSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity)) - else: # group.name == self.insitu_group_name - sssVar = self.group.createVariable("SeaWaterSalinity", "f4", ("dim",), fill_value=-32767.0) - self.__enrichSWSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity)) - sssVar[:] = self.sea_water_salinity - - if self.wind_speed.count(None) != len(self.wind_speed): - windSpeedVar = self.group.createVariable("WindSpeed", "f4", ("dim",), fill_value=-32767.0) - self.__enrichWindSpeed(windSpeedVar, self.__calcMin(self.wind_speed), max(self.wind_speed)) - windSpeedVar[:] = self.wind_speed - - if self.wind_u.count(None) != len(self.wind_u): - windUVar = self.group.createVariable("WindU", "f4", ("dim",), fill_value=-32767.0) - windUVar[:] = self.wind_u - self.__enrichWindU(windUVar, self.__calcMin(self.wind_u), max(self.wind_u)) - - if self.wind_v.count(None) != len(self.wind_v): - windVVar = self.group.createVariable("WindV", "f4", ("dim",), fill_value=-32767.0) - windVVar[:] = self.wind_v - self.__enrichWindV(windVVar, self.__calcMin(self.wind_v), max(self.wind_v)) - - if self.wind_direction.count(None) != len(self.wind_direction): - windDirVar = self.group.createVariable("WindDirection", "f4", ("dim",), fill_value=-32767.0) - windDirVar[:] = self.wind_direction - self.__enrichWindDir(windDirVar) - - if self.sea_water_temperature.count(None) != len(self.sea_water_temperature): - if self.group.name == self.satellite_group_name: - tempVar = self.group.createVariable("SeaSurfaceTemp", "f4", ("dim",), fill_value=-32767.0) - self.__enrichSurfaceTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature)) - else: - tempVar = self.group.createVariable("SeaWaterTemp", "f4", ("dim",), fill_value=-32767.0) - self.__enrichWaterTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature)) - tempVar[:] = self.sea_water_temperature - - if self.group.name == self.insitu_group_name: - depthVar = self.group.createVariable("Depth", "f4", ("dim",), fill_value=-32767.0) - - if self.depth.count(None) != len(self.depth): - self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth)) - depthVar[:] = self.depth - else: - # If depth has no data, set all values to 0 - tempDepth = [0 for x in range(len(self.depth))] - depthVar[:] = tempDepth - - # - # Lists may include 'None" values, to calc min these must be filtered out - # - @staticmethod - def __calcMin(var): - return min(x for x in var if x is not None) - - - # - # Add attributes to each variable - # - @staticmethod - def __enrichLon(var, var_min, var_max): - var.long_name = "Longitude" - var.standard_name = "longitude" - var.axis = "X" - var.units = "degrees_east" - var.valid_min = var_min - var.valid_max = var_max - - @staticmethod - def __enrichLat(var, var_min, var_max): - var.long_name = "Latitude" - var.standard_name = "latitude" - var.axis = "Y" - var.units = "degrees_north" - var.valid_min = var_min - var.valid_max = var_max - - @staticmethod - def __enrichTime(var): - var.long_name = "Time" - var.standard_name = "time" - var.axis = "T" - var.units = "seconds since 1970-01-01 00:00:00 0:00" - - @staticmethod - def __enrichSSSMeasurements(var, var_min, var_max): - var.long_name = "Sea surface salinity" - var.standard_name = "sea_surface_salinity" - var.units = "1e-3" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat time" - - @staticmethod - def __enrichSWSMeasurements(var, var_min, var_max): - var.long_name = "Sea water salinity" - var.standard_name = "sea_water_salinity" - var.units = "1e-3" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat depth time" - - @staticmethod - def __enrichDepth(var, var_min, var_max): - var.valid_min = var_min - var.valid_max = var_max - var.units = "m" - var.long_name = "Depth" - var.standard_name = "depth" - var.axis = "Z" - var.positive = "Down" - - @staticmethod - def __enrichWindSpeed(var, var_min, var_max): - var.long_name = "Wind speed" - var.standard_name = "wind_speed" - var.units = "m s-1" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat depth time" - - @staticmethod - def __enrichWindU(var, var_min, var_max): - var.long_name = "Eastward wind" - var.standard_name = "eastward_wind" - var.units = "m s-1" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat depth time" - - @staticmethod - def __enrichWindV(var, var_min, var_max): - var.long_name = "Northward wind" - var.standard_name = "northward_wind" - var.units = "m s-1" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat depth time" - - @staticmethod - def __enrichWaterTemp(var, var_min, var_max): - var.long_name = "Sea water temperature" - var.standard_name = "sea_water_temperature" - var.units = "degree_C" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat depth time" - - @staticmethod - def __enrichSurfaceTemp(var, var_min, var_max): - var.long_name = "Sea surface temperature" - var.standard_name = "sea_surface_temperature" - var.units = "degree_C" - var.valid_min = var_min - var.valid_max = var_max - var.coordinates = "lon lat time" - - @staticmethod - def __enrichWindDir(var): - var.long_name = "Wind from direction" - var.standard_name = "wind_from_direction" - var.units = "degree" - var.coordinates = "lon lat depth time" diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py deleted file mode 100644 index ac7f263..0000000 --- a/analysis/webservice/algorithms/doms/DatasetListQuery.py +++ /dev/null @@ -1,116 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import traceback - -import requests - -import BaseDomsHandler -import config -import values -from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler -from webservice.NexusHandler import nexus_handler -from webservice.webmodel import cached - - -@nexus_handler -class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS Dataset Listing" - path = "/domslist" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseHandler.__init__(self) - - def getFacetsForInsituSource(self, source): - url = source["url"] - - params = { - "facet": "true", - "stats": "true", - "startIndex": 0, - "itemsPerPage": 0 - } - try: - r = requests.get(url, params=params) - results = json.loads(r.text) - - depths = None - if "stats_fields" in results and "depth" in results["stats_fields"]: - depths = results["stats_fields"]["depth"] - - for facet in results["facets"]: - field = facet["field"] - for value in facet["values"]: - value["value"] = values.getDescByListNameAndId(field, int(value["value"])) - - return depths, results["facets"] - except: # KMG: Don't eat the exception. Add better handling... - traceback.print_exc() - return None, None - - def getMetadataUrlForDataset(self, dataset): - datasetSpec = config.getEndpointByName(dataset) - if datasetSpec is not None: - return datasetSpec["metadataUrl"] - else: - - # KMG: NOT a good hack - if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": - dataset = "MUR-JPL-L4-GLOB-v4.1" - elif dataset == "SMAP_L2B_SSS": - dataset = "JPL_SMAP-SSS_L2_EVAL-V2" - elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM": - dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0" - - return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset - - def getMetadataForSource(self, dataset): - try: - r = requests.get(self.getMetadataUrlForDataset(dataset)) - results = json.loads(r.text) - return results - except: - return None - - @cached(ttl=(60 * 60 * 1000)) # 1 hour cached - def calc(self, computeOptions, **args): - - satellitesList = self._get_tile_service().get_dataseries_list(simple=True) - - insituList = [] - - for satellite in satellitesList: - satellite["metadata"] = self.getMetadataForSource(satellite["shortName"]) - - for insitu in config.ENDPOINTS: - depths, facets = self.getFacetsForInsituSource(insitu) - insituList.append({ - "name": insitu["name"], - "endpoint": insitu["url"], - "metadata": self.getMetadataForSource(insitu["name"]), - "depths": depths, - "facets": facets - }) - - values = { - "satellite": satellitesList, - "insitu": insituList - } - - return BaseDomsHandler.DomsQueryResults(results=values) diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py deleted file mode 100644 index 2d429ca..0000000 --- a/analysis/webservice/algorithms/doms/DomsInitialization.py +++ /dev/null @@ -1,164 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - - -import ConfigParser -import logging - -import pkg_resources -from cassandra.cluster import Cluster -from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy - -from webservice.NexusHandler import nexus_initializer - -@nexus_initializer -class DomsInitializer: - def __init__(self): - pass - - def init(self, config): - log = logging.getLogger(__name__) - log.info("*** STARTING DOMS INITIALIZATION ***") - - domsconfig = ConfigParser.SafeConfigParser() - domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini')) - - cassHost = domsconfig.get("cassandra", "host") - cassPort = domsconfig.get("cassandra", "port") - cassKeyspace = domsconfig.get("cassandra", "keyspace") - cassDatacenter = domsconfig.get("cassandra", "local_datacenter") - cassVersion = int(domsconfig.get("cassandra", "protocol_version")) - cassPolicy = domsconfig.get("cassandra", "dc_policy") - - log.info("Cassandra Host(s): %s" % (cassHost)) - log.info("Cassandra Keyspace: %s" % (cassKeyspace)) - log.info("Cassandra Datacenter: %s" % (cassDatacenter)) - log.info("Cassandra Protocol Version: %s" % (cassVersion)) - log.info("Cassandra DC Policy: %s" % (cassPolicy)) - - if cassPolicy == 'DCAwareRoundRobinPolicy': - dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) - elif cassPolicy == 'WhiteListRoundRobinPolicy': - dc_policy = WhiteListRoundRobinPolicy([cassHost]) - token_policy = TokenAwarePolicy(dc_policy) - - with Cluster([host for host in cassHost.split(',')], port=int(cassPort), load_balancing_policy=token_policy, - protocol_version=cassVersion) as cluster: - session = cluster.connect() - - self.createKeyspace(session, cassKeyspace) - self.createTables(session) - - def createKeyspace(self, session, cassKeyspace): - log = logging.getLogger(__name__) - log.info("Verifying DOMS keyspace '%s'" % cassKeyspace) - session.execute( - "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace) - session.set_keyspace(cassKeyspace) - - def createTables(self, session): - log = logging.getLogger(__name__) - log.info("Verifying DOMS tables") - self.createDomsExecutionsTable(session) - self.createDomsParamsTable(session) - self.createDomsDataTable(session) - self.createDomsExecutionStatsTable(session) - - def createDomsExecutionsTable(self, session): - log = logging.getLogger(__name__) - log.info("Verifying doms_executions table") - cql = """ - CREATE TABLE IF NOT EXISTS doms_executions ( - id uuid PRIMARY KEY, - time_started timestamp, - time_completed timestamp, - user_email text - ); - """ - session.execute(cql) - - def createDomsParamsTable(self, session): - log = logging.getLogger(__name__) - log.info("Verifying doms_params table") - cql = """ - CREATE TABLE IF NOT EXISTS doms_params ( - execution_id uuid PRIMARY KEY, - primary_dataset text, - matchup_datasets text, - depth_tolerance decimal, - depth_min decimal, - depth_max decimal, - time_tolerance int, - radius_tolerance decimal, - start_time timestamp, - end_time timestamp, - platforms text, - bounding_box text, - parameter text - ); - """ - session.execute(cql) - - def createDomsDataTable(self, session): - log = logging.getLogger(__name__) - log.info("Verifying doms_data table") - cql = """ - CREATE TABLE IF NOT EXISTS doms_data ( - id uuid, - execution_id uuid, - value_id text, - primary_value_id text, - is_primary boolean, - x decimal, - y decimal, - source_dataset text, - measurement_time timestamp, - platform text, - device text, - measurement_values map, - PRIMARY KEY (execution_id, is_primary, id) - ); - """ - session.execute(cql) - - def createDomsExecutionStatsTable(self, session): - log = logging.getLogger(__name__) - log.info("Verifying doms_execuction_stats table") - cql = """ - CREATE TABLE IF NOT EXISTS doms_execution_stats ( - execution_id uuid PRIMARY KEY, - num_gridded_matched int, - num_gridded_checked int, - num_insitu_matched int, - num_insitu_checked int, - time_to_complete int - ); - """ - session.execute(cql) - - @staticmethod - def _get_config_files(filename): - log = logging.getLogger(__name__) - candidates = [] - extensions = ['.default', ''] - for extension in extensions: - try: - candidate = pkg_resources.resource_filename(__name__, filename + extension) - candidates.append(candidate) - except KeyError as ke: - log.warning('configuration file {} not found'.format(filename + extension)) - - return candidates diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py deleted file mode 100644 index 57a0834..0000000 --- a/analysis/webservice/algorithms/doms/MatchupQuery.py +++ /dev/null @@ -1,452 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math -import uuid -from datetime import datetime - -import numpy as np -import utm -from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon -from scipy import spatial - -import BaseDomsHandler -import ResultsStorage -import datafetch -import fetchedgeimpl -import geo -import workerthread -from webservice.NexusHandler import nexus_handler - - -@nexus_handler -class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "Experimental Combined DOMS In-Situ Matchup" - path = "/domsmatchup" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) - - def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms): - - boundsConstrainer = geo.BoundsConstrainer(asString=bbox) - threads = [] - for endpoint in endpoints: - thread = workerthread.WorkerThread(datafetch.fetchData, - params=(endpoint, startTime, endTime, bbox, depth_min, depth_max)) - threads.append(thread) - workerthread.wait(threads, startFirst=True, poll=0.01) - - data2 = [] - for thread in threads: - data, bounds = thread.results - data2 += data - boundsConstrainer.testOtherConstrainer(bounds) - - return data2, boundsConstrainer - - def __parseDatetime(self, dtString): - dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") - epoch = datetime.utcfromtimestamp(0) - time = (dt - epoch).total_seconds() * 1000.0 - return time - - def calc(self, computeOptions, **args): - primary = computeOptions.get_argument("primary", None) - matchup = computeOptions.get_argument("matchup", None) - startTime = computeOptions.get_argument("s", None) - endTime = computeOptions.get_argument("e", None) - bbox = computeOptions.get_argument("b", None) - timeTolerance = computeOptions.get_float_arg("tt") - depth_min = computeOptions.get_float_arg("depthMin", default=None) - depth_max = computeOptions.get_float_arg("depthMax", default=None) - radiusTolerance = computeOptions.get_float_arg("rt") - platforms = computeOptions.get_argument("platforms", None) - - if primary is None or len(primary) == 0: - raise Exception("No primary dataset specified") - - if matchup is None or len(matchup) == 0: - raise Exception("No matchup datasets specified") - - start = self._now() - - primarySpec = self.getDataSourceByName(primary) - if primarySpec is None: - raise Exception("Specified primary dataset not found using identifier '%s'" % primary) - - primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms) - - primaryContext = MatchupContext(primaryData) - - matchupIds = matchup.split(",") - - for matchupId in matchupIds: - matchupSpec = self.getDataSourceByName(matchupId) - - if matchupSpec is not None: # Then it's in the in-situ configuration - proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min, - depth_max, - platforms, timeTolerance, radiusTolerance) - proc.start() - else: # We assume it to be a Nexus tiled dataset - - ''' - Single Threaded at the moment... - ''' - daysinrange = self._get_tile_service().find_days_in_range_asc(bounds.south, bounds.north, bounds.west, - bounds.east, matchupId, - self.__parseDatetime(startTime) / 1000, - self.__parseDatetime(endTime) / 1000) - - tilesByDay = {} - for dayTimestamp in daysinrange: - ds1_nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box_at_time(bounds.south, bounds.north, - bounds.west, bounds.east, - matchupId, dayTimestamp) - - # print "***", type(ds1_nexus_tiles) - # print ds1_nexus_tiles[0].__dict__ - tilesByDay[dayTimestamp] = ds1_nexus_tiles - - primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance) - - matches, numMatches = primaryContext.getFinal(len(matchupIds)) - - end = self._now() - - args = { - "primary": primary, - "matchup": matchupIds, - "startTime": startTime, - "endTime": endTime, - "bbox": bbox, - "timeTolerance": timeTolerance, - "depthMin": depth_min, - "depthMax": depth_max, - "radiusTolerance": radiusTolerance, - "platforms": platforms - } - - details = { - "timeToComplete": (end - start), - "numInSituRecords": primaryContext.insituCount, - "numInSituMatched": primaryContext.insituMatches, - "numGriddedChecked": primaryContext.griddedCount, - "numGriddedMatched": primaryContext.griddedMatched - } - - with ResultsStorage.ResultsStorage() as resultsStorage: - execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start, - completeTime=end, userEmail="") - - return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, - computeOptions=None, executionId=execution_id) - - -class MatchupContextMap: - def __init__(self): - pass - - def add(self, context): - pass - - def delete(self, context): - pass - - -class MatchupContext: - def __init__(self, primaryData): - self.id = str(uuid.uuid4()) - - self.griddedCount = 0 - self.griddedMatched = 0 - - self.insituCount = len(primaryData) - self.insituMatches = 0 - - self.primary = primaryData - for r in self.primary: - r["matches"] = [] - - self.data = [] - for s in primaryData: - u = utm.from_latlon(s["y"], s["x"]) - v = (u[0], u[1], 0.0) - self.data.append(v) - - if len(self.data) > 0: - self.tree = spatial.KDTree(self.data) - else: - self.tree = None - - def getFinal(self, minMatchesToInclude): - - matched = [] - ttlMatches = 0 - for m in self.primary: - if len(m["matches"]) >= minMatchesToInclude: - matched.append(m) - ttlMatches += len(m["matches"]) - - return matched, ttlMatches - - def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance): - for r in self.primary: - foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"], - xyTolerance) - self.griddedCount += 1 - self.griddedMatched += len(foundSatNodes) - r["matches"].extend(foundSatNodes) - - def processInSitu(self, records, xyTolerance, timeTolerance): - if self.tree is not None: - for s in records: - self.insituCount += 1 - u = utm.from_latlon(s["y"], s["x"]) - coords = np.array([u[0], u[1], 0]) - ball = self.tree.query_ball_point(coords, xyTolerance) - - self.insituMatches += len(ball) - - for i in ball: - match = self.primary[i] - if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0): - match["matches"].append(s) - - def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"): - value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName) - return value - - def __checkNumber(self, value): - if isinstance(value, float) and (math.isnan(value) or value == np.nan): - value = None - elif value is not None: - value = float(value) - return value - - def __buildSwathIndexes(self, chunk): - latlons = [] - utms = [] - indexes = [] - for i in range(0, len(chunk.latitudes)): - _lat = chunk.latitudes[i] - if isinstance(_lat, np.ma.core.MaskedConstant): - continue - for j in range(0, len(chunk.longitudes)): - _lon = chunk.longitudes[j] - if isinstance(_lon, np.ma.core.MaskedConstant): - continue - - value = self.__getChunkValueAtIndex(chunk, (i, j)) - if isinstance(value, float) and (math.isnan(value) or value == np.nan): - continue - - u = utm.from_latlon(_lat, _lon) - v = (u[0], u[1], 0.0) - latlons.append((_lat, _lon)) - utms.append(v) - indexes.append((i, j)) - - tree = None - if len(latlons) > 0: - tree = spatial.KDTree(utms) - - chunk.swathIndexing = { - "tree": tree, - "latlons": latlons, - "indexes": indexes - } - - def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance): - foundIndexes = [] - foundLatLons = [] - - if "swathIndexing" not in chunk.__dict__: - self.__buildSwathIndexes(chunk) - - tree = chunk.swathIndexing["tree"] - if tree is not None: - indexes = chunk.swathIndexing["indexes"] - latlons = chunk.swathIndexing["latlons"] - u = utm.from_latlon(lat, lon) - coords = np.array([u[0], u[1], 0]) - ball = tree.query_ball_point(coords, xyTolerance) - for i in ball: - foundIndexes.append(indexes[i]) - foundLatLons.append(latlons[i]) - return foundIndexes, foundLatLons - - def __getChunkValueAtIndex(self, chunk, index, arrayName=None): - - if arrayName is None or arrayName == "data": - data_val = chunk.data[0][index[0]][index[1]] - else: - data_val = chunk.meta_data[arrayName][0][index[0]][index[1]] - return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan') - - def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance): - timeDiff = 86400 * 365 * 1000 - foundNodes = [] - - for ts in chunksByDay: - chunks = chunksByDay[ts] - if abs((ts * 1000) - searchTime) < timeDiff: - for chunk in chunks: - indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance) - - # for index in indexes: - for i in range(0, len(indexes)): - index = indexes[i] - latlon = latlons[i] - sst = None - sss = None - windSpeed = None - windDirection = None - windU = None - windV = None - - value = self.__getChunkValueAtIndex(chunk, index) - - if isinstance(value, float) and (math.isnan(value) or value == np.nan): - continue - - if "GHRSST" in source: - sst = value - elif "ASCATB" in source: - windU = value - elif "SSS" in source: # SMAP - sss = value - - if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: - windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir")) - if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: - windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v")) - if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: - windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed")) - - foundNode = { - "sea_water_temperature": sst, - "sea_water_salinity": sss, - "wind_speed": windSpeed, - "wind_direction": windDirection, - "wind_u": windU, - "wind_v": windV, - "time": ts, - "x": self.__checkNumber(latlon[1]), - "y": self.__checkNumber(latlon[0]), - "depth": 0, - "sea_water_temperature_depth": 0, - "source": source, - "id": "%s:%s:%s" % (ts, lat, lon) - } - - foundNodes.append(foundNode) - timeDiff = abs(ts - searchTime) - - return foundNodes - - def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime): - - timeDiff = 86400 * 365 * 1000 - foundNodes = [] - - for ts in chunksByDay: - chunks = chunksByDay[ts] - # print chunks - # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000 - if abs((ts * 1000) - searchTime) < timeDiff: - value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data") - value = self.__checkNumber(value) - - # _Really_ don't like doing it this way... - - sst = None - sss = None - windSpeed = None - windDirection = None - windU = None - windV = None - - if "GHRSST" in source: - sst = value - - if "ASCATB" in source: - windU = value - - if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: - windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir")) - if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: - windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v")) - if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: - windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed")) - - foundNode = { - "sea_water_temperature": sst, - "sea_water_salinity": sss, - "wind_speed": windSpeed, - "wind_direction": windDirection, - "wind_uv": { - "u": windU, - "v": windV - }, - "time": ts, - "x": lon, - "y": lat, - "depth": 0, - "sea_water_temperature_depth": 0, - "source": source, - "id": "%s:%s:%s" % (ts, lat, lon) - } - - isValidNode = True - if "ASCATB" in source and windSpeed is None: - isValidNode = None - - if isValidNode: - foundNodes.append(foundNode) - timeDiff = abs(ts - searchTime) - - return foundNodes - - -class InsituDatasetProcessor: - def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance, - radiusTolerance): - self.primary = primary - self.datasource = datasource - self.startTime = startTime - self.endTime = endTime - self.bbox = bbox - self.depth_min = depth_min - self.depth_max = depth_max - self.platforms = platforms - self.timeTolerance = timeTolerance - self.radiusTolerance = radiusTolerance - - def start(self): - def callback(pageData): - self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance) - - fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max, - self.platforms, pageCallback=callback) - - -class InsituPageProcessor: - def __init__(self): - pass diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py deleted file mode 100644 index aa24d91..0000000 --- a/analysis/webservice/algorithms/doms/MetadataQuery.py +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json - -import requests - -import BaseDomsHandler -import config -from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler -from webservice.NexusHandler import nexus_handler -from webservice.webmodel import DatasetNotFoundException - - -@nexus_handler -class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS Metadata Listing" - path = "/domsmetadata" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseHandler.__init__(self) - - def calc(self, computeOptions, **args): - - dataset = computeOptions.get_argument("dataset", None) - if dataset is None or len(dataset) == 0: - raise Exception("'dataset' parameter not specified") - - metadataUrl = self.__getUrlForDataset(dataset) - - try: - r = requests.get(metadataUrl) - results = json.loads(r.text) - return BaseDomsHandler.DomsQueryResults(results=results) - except: - raise DatasetNotFoundException("Dataset '%s' not found") - - def __getUrlForDataset(self, dataset): - datasetSpec = config.getEndpointByName(dataset) - if datasetSpec is not None: - return datasetSpec["metadataUrl"] - else: - - # KMG: NOT a good hack - if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": - dataset = "MUR-JPL-L4-GLOB-v4.1" - elif dataset == "SMAP_L2B_SSS": - dataset = "JPL_SMAP-SSS_L2_EVAL-V2" - - return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py deleted file mode 100644 index 1b48d14..0000000 --- a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py +++ /dev/null @@ -1,55 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import BaseDomsHandler -import histogramplot -import mapplot -import scatterplot -from webservice.NexusHandler import nexus_handler - - -class PlotTypes: - SCATTER = "scatter" - MAP = "map" - HISTOGRAM = "histogram" - - -@nexus_handler -class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS Results Plotting" - path = "/domsplot" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) - - def calc(self, computeOptions, **args): - id = computeOptions.get_argument("id", None) - parameter = computeOptions.get_argument('parameter', 'sst') - - plotType = computeOptions.get_argument("type", PlotTypes.SCATTER) - - normAndCurve = computeOptions.get_boolean_arg("normandcurve", False) - - if plotType == PlotTypes.SCATTER: - return scatterplot.createScatterPlot(id, parameter) - elif plotType == PlotTypes.MAP: - return mapplot.createMapPlot(id, parameter) - elif plotType == PlotTypes.HISTOGRAM: - return histogramplot.createHistogramPlot(id, parameter, normAndCurve) - else: - raise Exception("Unsupported plot type '%s' specified." % plotType) diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py deleted file mode 100644 index 93358e9..0000000 --- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py +++ /dev/null @@ -1,49 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import uuid - -import BaseDomsHandler -import ResultsStorage -from webservice.NexusHandler import nexus_handler -from webservice.webmodel import NexusProcessingException - - -@nexus_handler -class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS Resultset Retrieval" - path = "/domsresults" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) - - def calc(self, computeOptions, **args): - execution_id = computeOptions.get_argument("id", None) - - try: - execution_id = uuid.UUID(execution_id) - except: - raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400) - - simple_results = computeOptions.get_boolean_arg("simpleResults", default=False) - - with ResultsStorage.ResultsRetrieval() as storage: - params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results) - - return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None, - computeOptions=None, executionId=execution_id) diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py deleted file mode 100644 index 03bbd09..0000000 --- a/analysis/webservice/algorithms/doms/ResultsStorage.py +++ /dev/null @@ -1,286 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - - -import ConfigParser -import logging -import uuid -from datetime import datetime - -import pkg_resources -from cassandra.cluster import Cluster -from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy -from cassandra.query import BatchStatement -from pytz import UTC - - -class AbstractResultsContainer: - def __init__(self): - self._log = logging.getLogger(__name__) - self._log.info("Creating DOMS Results Storage Instance") - - self._session = None - - def __enter__(self): - domsconfig = ConfigParser.RawConfigParser() - domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini') - - cassHost = domsconfig.get("cassandra", "host") - cassKeyspace = domsconfig.get("cassandra", "keyspace") - cassDatacenter = domsconfig.get("cassandra", "local_datacenter") - cassVersion = int(domsconfig.get("cassandra", "protocol_version")) - - dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) - token_policy = TokenAwarePolicy(dc_policy) - - self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy, - protocol_version=cassVersion) - - self._session = self._cluster.connect(cassKeyspace) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self._cluster.shutdown() - - def _parseDatetime(self, dtString): - dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") - epoch = datetime.utcfromtimestamp(0) - time = (dt - epoch).total_seconds() * 1000.0 - return int(time) - - -class ResultsStorage(AbstractResultsContainer): - def __init__(self): - AbstractResultsContainer.__init__(self) - - def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None): - if isinstance(execution_id, basestring): - execution_id = uuid.UUID(execution_id) - - execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail) - self.__insertParams(execution_id, params) - self.__insertStats(execution_id, stats) - self.__insertResults(execution_id, results) - return execution_id - - def insertExecution(self, execution_id, startTime, completeTime, userEmail): - if execution_id is None: - execution_id = uuid.uuid4() - - cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)" - self._session.execute(cql, (execution_id, startTime, completeTime, userEmail)) - return execution_id - - def __insertParams(self, execution_id, params): - cql = """INSERT INTO doms_params - (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter) - VALUES - (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - self._session.execute(cql, (execution_id, - params["primary"], - ",".join(params["matchup"]) if type(params["matchup"]) == list else params[ - "matchup"], - params["depthMin"] if "depthMin" in params.keys() else None, - params["depthMax"] if "depthMax" in params.keys() else None, - int(params["timeTolerance"]), - params["radiusTolerance"], - params["startTime"], - params["endTime"], - params["platforms"], - params["bbox"], - params["parameter"] - )) - - def __insertStats(self, execution_id, stats): - cql = """ - INSERT INTO doms_execution_stats - (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete) - VALUES - (%s, %s, %s, %s, %s, %s) - """ - self._session.execute(cql, ( - execution_id, - stats["numGriddedMatched"], - stats["numGriddedChecked"], - stats["numInSituMatched"], - stats["numInSituRecords"], - stats["timeToComplete"] - )) - - def __insertResults(self, execution_id, results): - - cql = """ - INSERT INTO doms_data - (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary) - VALUES - (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """ - insertStatement = self._session.prepare(cql) - batch = BatchStatement() - - for result in results: - self.__insertResult(execution_id, None, result, batch, insertStatement) - - self._session.execute(batch) - - def __insertResult(self, execution_id, primaryId, result, batch, insertStatement): - - dataMap = self.__buildDataMap(result) - result_id = uuid.uuid4() - batch.add(insertStatement, ( - result_id, - execution_id, - result["id"], - primaryId, - result["x"], - result["y"], - result["source"], - result["time"], - result["platform"] if "platform" in result else None, - result["device"] if "device" in result else None, - dataMap, - 1 if primaryId is None else 0 - ) - ) - - n = 0 - if "matches" in result: - for match in result["matches"]: - self.__insertResult(execution_id, result["id"], match, batch, insertStatement) - n += 1 - if n >= 20: - if primaryId is None: - self.__commitBatch(batch) - n = 0 - - if primaryId is None: - self.__commitBatch(batch) - - def __commitBatch(self, batch): - self._session.execute(batch) - batch.clear() - - def __buildDataMap(self, result): - dataMap = {} - for name in result: - value = result[name] - if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type( - value) in [float, int]: - dataMap[name] = value - return dataMap - - -class ResultsRetrieval(AbstractResultsContainer): - def __init__(self): - AbstractResultsContainer.__init__(self) - - def retrieveResults(self, execution_id, trim_data=False): - if isinstance(execution_id, basestring): - execution_id = uuid.UUID(execution_id) - - params = self.__retrieveParams(execution_id) - stats = self.__retrieveStats(execution_id) - data = self.__retrieveData(execution_id, trim_data=trim_data) - return params, stats, data - - def __retrieveData(self, id, trim_data=False): - dataMap = self.__retrievePrimaryData(id, trim_data=trim_data) - self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data) - data = [dataMap[name] for name in dataMap] - return data - - def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False): - cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false" - rows = self._session.execute(cql, (id,)) - - for row in rows: - entry = self.__rowToDataEntry(row, trim_data=trim_data) - if row.primary_value_id in dataMap: - if not "matches" in dataMap[row.primary_value_id]: - dataMap[row.primary_value_id]["matches"] = [] - dataMap[row.primary_value_id]["matches"].append(entry) - else: - print row - - def __retrievePrimaryData(self, id, trim_data=False): - cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true" - rows = self._session.execute(cql, (id,)) - - dataMap = {} - for row in rows: - entry = self.__rowToDataEntry(row, trim_data=trim_data) - dataMap[row.value_id] = entry - return dataMap - - def __rowToDataEntry(self, row, trim_data=False): - if trim_data: - entry = { - "x": float(row.x), - "y": float(row.y), - "source": row.source_dataset, - "time": row.measurement_time.replace(tzinfo=UTC) - } - else: - entry = { - "id": row.value_id, - "x": float(row.x), - "y": float(row.y), - "source": row.source_dataset, - "device": row.device, - "platform": row.platform, - "time": row.measurement_time.replace(tzinfo=UTC) - } - for key in row.measurement_values: - value = float(row.measurement_values[key]) - entry[key] = value - return entry - - def __retrieveStats(self, id): - cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1" - rows = self._session.execute(cql, (id,)) - for row in rows: - stats = { - "numGriddedMatched": row.num_gridded_matched, - "numGriddedChecked": row.num_gridded_checked, - "numInSituMatched": row.num_insitu_matched, - "numInSituChecked": row.num_insitu_checked, - "timeToComplete": row.time_to_complete - } - return stats - - raise Exception("Execution not found with id '%s'" % id) - - def __retrieveParams(self, id): - cql = "SELECT * FROM doms_params where execution_id = %s limit 1" - rows = self._session.execute(cql, (id,)) - for row in rows: - params = { - "primary": row.primary_dataset, - "matchup": row.matchup_datasets.split(","), - "depthMin": row.depth_min, - "depthMax": row.depth_max, - "timeTolerance": row.time_tolerance, - "radiusTolerance": row.radius_tolerance, - "startTime": row.start_time.replace(tzinfo=UTC), - "endTime": row.end_time.replace(tzinfo=UTC), - "platforms": row.platforms, - "bbox": row.bounding_box, - "parameter": row.parameter - } - return params - - raise Exception("Execution not found with id '%s'" % id) diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py deleted file mode 100644 index f5ac765..0000000 --- a/analysis/webservice/algorithms/doms/StatsQuery.py +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import BaseDomsHandler -import datafetch -from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler -from webservice.NexusHandler import nexus_handler - - -@nexus_handler -class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS In-Situ Stats Lookup" - path = "/domsstats" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseHandler.__init__(self) - - def calc(self, computeOptions, **args): - source = computeOptions.get_argument("source", None) - startTime = computeOptions.get_argument("s", None) - endTime = computeOptions.get_argument("e", None) - bbox = computeOptions.get_argument("b", None) - timeTolerance = computeOptions.get_float_arg("tt") - depth_min = computeOptions.get_float_arg("depthMin", default=None) - depth_max = computeOptions.get_float_arg("depthMax", default=None) - radiusTolerance = computeOptions.get_float_arg("rt") - platforms = computeOptions.get_argument("platforms", None) - - source1 = self.getDataSourceByName(source) - if source1 is None: - raise Exception("Source '%s' not found" % source) - - count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms) - - args = { - "source": source, - "startTime": startTime, - "endTime": endTime, - "bbox": bbox, - "timeTolerance": timeTolerance, - "depthMin": depth_min, - "depthMax": depth_max, - "radiusTolerance": radiusTolerance, - "platforms": platforms - } - - return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count, - computeOptions=None) diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py deleted file mode 100644 index d766c7b..0000000 --- a/analysis/webservice/algorithms/doms/ValuesQuery.py +++ /dev/null @@ -1,72 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -from pytz import timezone - -import BaseDomsHandler -import datafetch -from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler -from webservice.NexusHandler import nexus_handler - -EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) - - -@nexus_handler -class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS In-Situ Value Lookup" - path = "/domsvalues" - description = "" - params = {} - singleton = True - - def __init__(self): - BaseHandler.__init__(self) - - def calc(self, computeOptions, **args): - source = computeOptions.get_argument("source", None) - startTime = computeOptions.get_start_datetime() - endTime = computeOptions.get_end_datetime() - bbox = computeOptions.get_argument("b", None) - timeTolerance = computeOptions.get_float_arg("tt") - depth_min = computeOptions.get_float_arg("depthMin", default=None) - depth_max = computeOptions.get_float_arg("depthMax", default=None) - radiusTolerance = computeOptions.get_float_arg("rt") - platforms = computeOptions.get_argument("platforms", "") - - source1 = self.getDataSourceByName(source) - if source1 is None: - raise Exception("Source '%s' not found" % source) - - values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'), - endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max, - platforms, placeholders=True) - count = len(values) - - args = { - "source": source, - "startTime": startTime, - "endTime": endTime, - "bbox": bbox, - "timeTolerance": timeTolerance, - "depthMin": depth_min, - "depthMax": depth_max, - "radiusTolerance": radiusTolerance, - "platforms": platforms - } - - return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count, - computeOptions=None) diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py deleted file mode 100644 index d5a8e24..0000000 --- a/analysis/webservice/algorithms/doms/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import BaseDomsHandler -import DatasetListQuery -import DomsInitialization -import MatchupQuery -import MetadataQuery -import ResultsPlotQuery -import ResultsRetrieval -import ResultsStorage -import StatsQuery -import ValuesQuery -import config -import datafetch -import fetchedgeimpl -import geo -import insitusubset -import subsetter -import values -import workerthread diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py deleted file mode 100644 index ff492e8..0000000 --- a/analysis/webservice/algorithms/doms/config.py +++ /dev/null @@ -1,109 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ENDPOINTS = [ - { - "name": "samos", - "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 1000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" - }, - { - "name": "spurs", - "url": "https://doms.jpl.nasa.gov/ws/search/spurs", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 25000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" - }, - { - "name": "icoads", - "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 1000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" - }, - { - "name": "spurs2", - "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 25000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" - } -] - -METADATA_LINKS = { - "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2", - "icoads": "https://rda.ucar.edu/datasets/ds548.1/", - "spurs": "https://podaac.jpl.nasa.gov/spurs" -} - -import os - -try: - env = os.environ['ENV'] - if env == 'dev': - ENDPOINTS = [ - { - "name": "samos", - "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 1000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" - }, - { - "name": "spurs", - "url": "http://127.0.0.1:8890/ws/search/spurs", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 25000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" - }, - { - "name": "icoads", - "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 1000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" - }, - { - "name": "spurs2", - "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", - "fetchParallel": True, - "fetchThreads": 8, - "itemsPerPage": 25000, - "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" - } - ] - METADATA_LINKS = { - "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2", - "icoads": "https://rda.ucar.edu/datasets/ds548.1/", - "spurs": "https://podaac.jpl.nasa.gov/spurs" - } -except KeyError: - pass - - -def getEndpointByName(name): - for endpoint in ENDPOINTS: - if endpoint["name"].upper() == name.upper(): - return endpoint - return None diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py deleted file mode 100644 index 3fc3917..0000000 --- a/analysis/webservice/algorithms/doms/datafetch.py +++ /dev/null @@ -1,47 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import fetchedgeimpl - - -def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): - return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - - -def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): - return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - - -def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None): - data = [] - for endpoint in endpoints: - dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - data = data + dataSingleSource - return data - - -def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): - if type(endpoint) == list: - return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - else: - return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - - -def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): - return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders) - - -if __name__ == "__main__": - pass diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini.default b/analysis/webservice/algorithms/doms/domsconfig.ini.default deleted file mode 100644 index d1814bf..0000000 --- a/analysis/webservice/algorithms/doms/domsconfig.ini.default +++ /dev/null @@ -1,15 +0,0 @@ -[cassandra] -host=sdap-cassandra -port=9042 -keyspace=doms -local_datacenter=datacenter1 -protocol_version=3 -dc_policy=DCAwareRoundRobinPolicy - - -[cassandraDD] -host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177 -keyspace=doms -local_datacenter=B600 -protocol_version=3 - diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py deleted file mode 100644 index 70cf14e..0000000 --- a/analysis/webservice/algorithms/doms/fetchedgeimpl.py +++ /dev/null @@ -1,217 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import traceback -from datetime import datetime -from multiprocessing.pool import ThreadPool - -import requests - -import geo -import values -from webservice.webmodel import NexusProcessingException - - -def __parseDatetime(dtString): - dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") - epoch = datetime.utcfromtimestamp(0) - time = (dt - epoch).total_seconds() * 1000.0 - return time - - -def __parseLocation(locString): - if "Point" in locString: - locString = locString[6:-1] - - if "," in locString: - latitude = float(locString.split(",")[0]) - longitude = float(locString.split(",")[1]) - else: - latitude = float(locString.split(" ")[1]) - longitude = float(locString.split(" ")[0]) - - return (latitude, longitude) - - -def __resultRawToUsable(resultdict): - resultdict["time"] = __parseDatetime(resultdict["time"]) - latitude, longitude = __parseLocation(resultdict["point"]) - - resultdict["x"] = longitude - resultdict["y"] = latitude - - if "id" not in resultdict and "metadata" in resultdict: - resultdict["id"] = resultdict["metadata"] - - resultdict["id"] = "id-%s" % resultdict["id"] - - if "device" in resultdict: - resultdict["device"] = values.getDeviceById(resultdict["device"]) - - if "platform" in resultdict: - resultdict["platform"] = values.getPlatformById(resultdict["platform"]) - - if "mission" in resultdict: - resultdict["mission"] = values.getMissionById(resultdict["mission"]) - - if "sea_surface_temperature" in resultdict: - resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"] - del resultdict["sea_surface_temperature"] - - return resultdict - - -def __fetchJson(url, params, trycount=1, maxtries=5): - if trycount > maxtries: - raise Exception("Maximum retries attempted.") - if trycount > 1: - print "Retry #", trycount - r = requests.get(url, params=params, timeout=500.000) - - print r.url - - if r.status_code != 200: - return __fetchJson(url, params, trycount + 1, maxtries) - try: - results = json.loads(r.text) - return results - except: - return __fetchJson(url, params, trycount + 1, maxtries) - - -def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, - platforms=None, - pageCallback=None): - params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage, - "startIndex": startIndex, "stats": "true"} - - if depth_min is not None: - params['minDepth'] = depth_min - if depth_max is not None: - params['maxDepth'] = depth_max - - if platforms is not None: - params["platform"] = platforms.split(",") - - resultsRaw = __fetchJson(endpoint["url"], params) - boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) - - if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check - return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer - - try: - results = [] - for resultdict in resultsRaw["results"]: - result = __resultRawToUsable(resultdict) - result["source"] = endpoint["name"] - boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"]) - results.append(result) - - if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0: - stats = resultsRaw["stats_fields"] - if "lat" in stats and "lon" in stats: - boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'], - west=stats['lon']['min'], east=stats['lon']['max']) - - if pageCallback is not None: - pageCallback(results) - - ''' - If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. - ''' - if pageCallback is None: - return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( - resultsRaw["itemsPerPage"]), boundsConstrainer - else: - return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( - resultsRaw["itemsPerPage"]), boundsConstrainer - except: - print "Invalid or missing JSON in response." - traceback.print_exc() - raise NexusProcessingException(reason="Invalid or missing JSON in response.") - # return [], 0, startIndex, itemsPerPage, boundsConstrainer - - -def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): - startIndex = 0 - pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, - endTime, bbox, - depth_min, depth_max, 0, - startIndex, platforms) - return totalResults, boundsConstrainer - - -def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None): - results = [] - startIndex = 0 - - mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) - - # First isn't parellel so we can get the ttl results, forced items per page, etc... - pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, - endTime, bbox, - depth_min, depth_max, - endpoint["itemsPerPage"], - startIndex, platforms, - pageCallback) - results = results + pageResults - mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer) - - pool = ThreadPool(processes=endpoint["fetchThreads"]) - mpResults = [pool.apply_async(__doQuery, args=( - endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in - range(len(pageResults), totalResults, itemsPerPageR)] - pool.close() - pool.join() - - ''' - If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. - ''' - if pageCallback is None: - mpResults = [p.get() for p in mpResults] - for mpResult in mpResults: - results = results + mpResult[0] - mainBoundsConstrainer.testOtherConstrainer(mpResult[4]) - - return results, mainBoundsConstrainer - - -def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): - results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) - - if placeholders: - trimmedResults = [] - for item in results: - depth = None - if "depth" in item: - depth = item["depth"] - if "sea_water_temperature_depth" in item: - depth = item["sea_water_temperature_depth"] - - trimmedItem = { - "x": item["x"], - "y": item["y"], - "source": item["source"], - "time": item["time"], - "device": item["device"] if "device" in item else None, - "platform": item["platform"], - "depth": depth - } - trimmedResults.append(trimmedItem) - - results = trimmedResults - - return results, boundsConstrainer diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py deleted file mode 100644 index 3323f57..0000000 --- a/analysis/webservice/algorithms/doms/geo.py +++ /dev/null @@ -1,129 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import math - -MEAN_RADIUS_EARTH_METERS = 6371010.0 -EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0 -POLAR_RADIUS_EARTH_METERS = 6356752.0 -FLATTENING_EARTH = 298.257223563 -MEAN_RADIUS_EARTH_MILES = 3958.8 - - -class DistanceUnit(object): - METERS = 0 - MILES = 1 - - -# Haversine implementation for great-circle distances between two points -def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS): - if units == DistanceUnit.METERS: - R = MEAN_RADIUS_EARTH_METERS - elif units == DistanceUnit.MILES: - R = MEAN_RADIUS_EARTH_MILES - else: - raise Exception("Invalid units specified") - x0r = x0 * (math.pi / 180.0) # To radians - x1r = x1 * (math.pi / 180.0) # To radians - xd = (x1 - x0) * (math.pi / 180.0) - yd = (y1 - y0) * (math.pi / 180.0) - - a = math.sin(xd / 2.0) * math.sin(xd / 2.0) + \ - math.cos(x0r) * math.cos(x1r) * \ - math.sin(yd / 2.0) * math.sin(yd / 2.0) - c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a)) - d = R * c - return d - - -# Equirectangular approximation for when performance is key. Better at smaller distances -def equirectangularApprox(x0, y0, x1, y1): - R = 6371000.0 # Meters - x0r = x0 * (math.pi / 180.0) # To radians - x1r = x1 * (math.pi / 180.0) - y0r = y0 * (math.pi / 180.0) - y1r = y1 * (math.pi / 180.0) - - x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0) - y = x1r - x0r - d = math.sqrt(x * x + y * y) * R - return d - - -class BoundingBox(object): - - def __init__(self, north=None, south=None, west=None, east=None, asString=None): - if asString is not None: - bboxParts = asString.split(",") - self.west = float(bboxParts[0]) - self.south = float(bboxParts[1]) - self.east = float(bboxParts[2]) - self.north = float(bboxParts[3]) - else: - self.north = north - self.south = south - self.west = west - self.east = east - - def toString(self): - return "%s,%s,%s,%s" % (self.west, self.south, self.east, self.north) - - def toMap(self): - return { - "xmin": self.west, - "xmax": self.east, - "ymin": self.south, - "ymax": self.north - } - - -''' - Constrains, does not expand. -''' - - -class BoundsConstrainer(BoundingBox): - - def __init__(self, north=None, south=None, west=None, east=None, asString=None): - BoundingBox.__init__(self, north, south, west, east, asString) - - def testNorth(self, v): - if v is None: - return - self.north = max([self.north, v]) - - def testSouth(self, v): - if v is None: - return - self.south = min([self.south, v]) - - def testEast(self, v): - if v is None: - return - self.east = max([self.east, v]) - - def testWest(self, v): - if v is None: - return - self.west = min([self.west, v]) - - def testCoords(self, north=None, south=None, west=None, east=None): - self.testNorth(north) - self.testSouth(south) - self.testWest(west) - self.testEast(east) - - def testOtherConstrainer(self, other): - self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east) diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py deleted file mode 100644 index 1e06b66..0000000 --- a/analysis/webservice/algorithms/doms/histogramplot.py +++ /dev/null @@ -1,127 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import string -from cStringIO import StringIO -from multiprocessing import Process, Manager - -import matplotlib -import matplotlib.mlab as mlab -import matplotlib.pyplot as plt -import numpy as np - -import BaseDomsHandler -import ResultsStorage - -if not matplotlib.get_backend(): - matplotlib.use('Agg') - -PARAMETER_TO_FIELD = { - "sst": "sea_water_temperature", - "sss": "sea_water_salinity" -} - -PARAMETER_TO_UNITS = { - "sst": "($^\circ$C)", - "sss": "(g/L)" -} - - -class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults): - - def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, - computeOptions=None, executionId=None, plot=None): - BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds, - count=count, computeOptions=computeOptions, executionId=executionId) - self.__primary = primary - self.__secondary = secondary - self.__x = x - self.__parameter = parameter - self.__plot = plot - - def toImage(self): - return self.__plot - - -def render(d, x, primary, secondary, parameter, norm_and_curve=False): - fig, ax = plt.subplots() - fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold') - - n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75) - - if norm_and_curve: - mean = np.mean(x) - variance = np.var(x) - sigma = np.sqrt(variance) - y = mlab.normpdf(bins, mean, sigma) - l = plt.plot(bins, y, 'r--', linewidth=1) - - ax.set_title('n = %d' % len(x)) - - units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] - ax.set_xlabel("%s - %s %s" % (primary, secondary, units)) - - if norm_and_curve: - ax.set_ylabel("Probability per unit difference") - else: - ax.set_ylabel("Frequency") - - plt.grid(True) - - sio = StringIO() - plt.savefig(sio, format='png') - d['plot'] = sio.getvalue() - - -def renderAsync(x, primary, secondary, parameter, norm_and_curve): - manager = Manager() - d = manager.dict() - p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve)) - p.start() - p.join() - return d['plot'] - - -def createHistogramPlot(id, parameter, norm_and_curve=False): - with ResultsStorage.ResultsRetrieval() as storage: - params, stats, data = storage.retrieveResults(id) - - primary = params["primary"] - secondary = params["matchup"][0] - - x = createHistTable(data, secondary, parameter) - - plot = renderAsync(x, primary, secondary, parameter, norm_and_curve) - - r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary, - args=params, details=stats, - bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) - return r - - -def createHistTable(results, secondary, parameter): - x = [] - - field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] - - for entry in results: - for match in entry["matches"]: - if match["source"] == secondary: - if field in entry and field in match: - a = entry[field] - b = match[field] - x.append((a - b)) - - return x diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py deleted file mode 100644 index 7f60e99..0000000 --- a/analysis/webservice/algorithms/doms/insitusubset.py +++ /dev/null @@ -1,263 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import StringIO -import csv -import json -import logging -from datetime import datetime - -import requests - -import BaseDomsHandler -from webservice.NexusHandler import nexus_handler -from webservice.algorithms.doms import config as edge_endpoints -from webservice.webmodel import NexusProcessingException, NoDataException - -ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' - - -@nexus_handler -class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS In Situ Subsetter" - path = "/domsinsitusubset" - description = "Subset a DOMS in situ source given the search domain." - - params = [ - { - "name": "source", - "type": "comma-delimited string", - "description": "The in situ Dataset to be sub-setted", - "required": "true", - "sample": "spurs" - }, - { - "name": "parameter", - "type": "string", - "description": "The parameter of interest. One of 'sst', 'sss', 'wind'", - "required": "false", - "default": "All", - "sample": "sss" - }, - { - "name": "startTime", - "type": "string", - "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", - "required": "true", - "sample": "2013-10-21T00:00:00Z" - }, - { - "name": "endTime", - "type": "string", - "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", - "required": "true", - "sample": "2013-10-31T23:59:59Z" - }, - { - "name": "b", - "type": "comma-delimited float", - "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " - "Maximum (Eastern) Longitude, Maximum (Northern) Latitude", - "required": "true", - "sample": "-30,15,-45,30" - }, - { - "name": "depthMin", - "type": "float", - "description": "Minimum depth of measurements. Must be less than depthMax", - "required": "false", - "default": "No limit", - "sample": "0" - }, - { - "name": "depthMax", - "type": "float", - "description": "Maximum depth of measurements. Must be greater than depthMin", - "required": "false", - "default": "No limit", - "sample": "5" - }, - { - "name": "platforms", - "type": "comma-delimited integer", - "description": "Platforms to include for subset consideration", - "required": "false", - "default": "All", - "sample": "1,2,3,4,5,6,7,8,9" - }, - { - "name": "output", - "type": "string", - "description": "Output type. Only 'CSV' or 'JSON' is currently supported", - "required": "false", - "default": "JSON", - "sample": "CSV" - } - ] - singleton = True - - def __init__(self): - BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - - def parse_arguments(self, request): - # Parse input arguments - self.log.debug("Parsing arguments") - - source_name = request.get_argument('source', None) - if source_name is None or source_name.strip() == '': - raise NexusProcessingException(reason="'source' argument is required", code=400) - - parameter_s = request.get_argument('parameter', None) - if parameter_s not in ['sst', 'sss', 'wind', None]: - raise NexusProcessingException( - reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) - - try: - start_time = request.get_start_datetime() - start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") - except: - raise NexusProcessingException( - reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - try: - end_time = request.get_end_datetime() - end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") - except: - raise NexusProcessingException( - reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - - if start_time > end_time: - raise NexusProcessingException( - reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( - request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), - code=400) - - try: - bounding_polygon = request.get_bounding_polygon() - except: - raise NexusProcessingException( - reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", - code=400) - - depth_min = request.get_decimal_arg('depthMin', default=None) - depth_max = request.get_decimal_arg('depthMax', default=None) - - if depth_min is not None and depth_max is not None and depth_min >= depth_max: - raise NexusProcessingException( - reason="Depth Min should be less than Depth Max", code=400) - - platforms = request.get_argument('platforms', None) - if platforms is not None: - try: - p_validation = platforms.split(',') - p_validation = [int(p) for p in p_validation] - del p_validation - except: - raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) - - return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms - - def calc(self, request, **args): - - source_name, parameter_s, start_time, end_time, bounding_polygon, \ - depth_min, depth_max, platforms = self.parse_arguments(request) - - with requests.session() as edge_session: - edge_results = query_edge(source_name, parameter_s, start_time, end_time, - ','.join([str(bound) for bound in bounding_polygon.bounds]), - platforms, depth_min, depth_max, edge_session)['results'] - - if len(edge_results) == 0: - raise NoDataException - return InSituSubsetResult(results=edge_results) - - -class InSituSubsetResult(object): - def __init__(self, results): - self.results = results - - def toJson(self): - return json.dumps(self.results, indent=4) - - def toCSV(self): - fieldnames = sorted(next(iter(self.results)).keys()) - - csv_mem_file = StringIO.StringIO() - try: - writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames) - - writer.writeheader() - writer.writerows(self.results) - csv_out = csv_mem_file.getvalue() - finally: - csv_mem_file.close() - - return csv_out - - -def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000, - startIndex=0, stats=True): - log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge') - try: - startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') - except TypeError: - # Assume we were passed a properly formatted string - pass - - try: - endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') - except TypeError: - # Assume we were passed a properly formatted string - pass - - try: - platform = platform.split(',') - except AttributeError: - # Assume we were passed a list - pass - - params = {"startTime": startTime, - "endTime": endTime, - "bbox": bbox, - "minDepth": depth_min, - "maxDepth": depth_max, - "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} - - if variable: - params['variable'] = variable - if platform: - params['platform'] = platform - - edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) - - edge_request.raise_for_status() - edge_response = json.loads(edge_request.text) - - # Get all edge results - next_page_url = edge_response.get('next', None) - while next_page_url is not None: - log.debug("requesting %s" % next_page_url) - edge_page_request = session.get(next_page_url) - - edge_page_request.raise_for_status() - edge_page_response = json.loads(edge_page_request.text) - - edge_response['results'].extend(edge_page_response['results']) - - next_page_url = edge_page_response.get('next', None) - - return edge_response diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py deleted file mode 100644 index 3af85d3..0000000 --- a/analysis/webservice/algorithms/doms/mapplot.py +++ /dev/null @@ -1,175 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import string -from cStringIO import StringIO -from multiprocessing import Process, Manager - -import matplotlib -import matplotlib.pyplot as plt -import numpy as np -from mpl_toolkits.basemap import Basemap - -import BaseDomsHandler -import ResultsStorage - -if not matplotlib.get_backend(): - matplotlib.use('Agg') - -PARAMETER_TO_FIELD = { - "sst": "sea_water_temperature", - "sss": "sea_water_salinity" -} - -PARAMETER_TO_UNITS = { - "sst": "($^\circ$ C)", - "sss": "(g/L)" -} - - -def __square(minLon, maxLon, minLat, maxLat): - if maxLat - minLat > maxLon - minLon: - a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0 - minLon -= a - maxLon += a - elif maxLon - minLon > maxLat - minLat: - a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0 - minLat -= a - maxLat += a - - return minLon, maxLon, minLat, maxLat - - -def render(d, lats, lons, z, primary, secondary, parameter): - fig = plt.figure() - ax = fig.add_axes([0.1, 0.1, 0.8, 0.8]) - - ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) - # ax.set_ylabel('Latitude') - # ax.set_xlabel('Longitude') - - minLatA = np.min(lats) - maxLatA = np.max(lats) - minLonA = np.min(lons) - maxLonA = np.max(lons) - - minLat = minLatA - (abs(maxLatA - minLatA) * 0.1) - maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1) - - minLon = minLonA - (abs(maxLonA - minLonA) * 0.1) - maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1) - - minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat) - - # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l') - m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat, - resolution='l') - - m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10) - m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10) - - m.drawcoastlines() - m.drawmapboundary(fill_color='#99ffff') - m.fillcontinents(color='#cc9966', lake_color='#99ffff') - - # lats, lons = np.meshgrid(lats, lons) - - masked_array = np.ma.array(z, mask=np.isnan(z)) - z = masked_array - - values = np.zeros(len(z)) - for i in range(0, len(z)): - values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10 - - x, y = m(lons, lats) - - im1 = m.scatter(x, y, values) - - im1.set_array(z) - cb = m.colorbar(im1) - - units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] - cb.set_label("Difference %s" % units) - - sio = StringIO() - plt.savefig(sio, format='png') - plot = sio.getvalue() - if d is not None: - d['plot'] = plot - return plot - - -class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults): - def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, - computeOptions=None, executionId=None, plot=None): - BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args, - details=details, bounds=bounds, count=count, - computeOptions=computeOptions, executionId=executionId) - self.__lats = lats - self.__lons = lons - self.__z = np.array(z) - self.__parameter = parameter - self.__primary = primary - self.__secondary = secondary - self.__plot = plot - - def toImage(self): - return self.__plot - - -def renderAsync(x, y, z, primary, secondary, parameter): - manager = Manager() - d = manager.dict() - p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) - p.start() - p.join() - return d['plot'] - - -def createMapPlot(id, parameter): - with ResultsStorage.ResultsRetrieval() as storage: - params, stats, data = storage.retrieveResults(id) - - primary = params["primary"] - secondary = params["matchup"][0] - - lats = [] - lons = [] - z = [] - - field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] - - for entry in data: - for match in entry["matches"]: - if match["source"] == secondary: - - if field in entry and field in match: - a = entry[field] - b = match[field] - z.append((a - b)) - z.append((a - b)) - else: - z.append(1.0) - z.append(1.0) - lats.append(entry["y"]) - lons.append(entry["x"]) - lats.append(match["y"]) - lons.append(match["x"]) - - plot = renderAsync(lats, lons, z, primary, secondary, parameter) - r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary, - args=params, - details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) - return r diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py deleted file mode 100644 index 2ff57ee..0000000 --- a/analysis/webservice/algorithms/doms/scatterplot.py +++ /dev/null @@ -1,118 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import string -from cStringIO import StringIO -from multiprocessing import Process, Manager - -import matplotlib -import matplotlib.pyplot as plt - -import BaseDomsHandler -import ResultsStorage - -if not matplotlib.get_backend(): - matplotlib.use('Agg') - -PARAMETER_TO_FIELD = { - "sst": "sea_water_temperature", - "sss": "sea_water_salinity" -} - -PARAMETER_TO_UNITS = { - "sst": "($^\circ$ C)", - "sss": "(g/L)" -} - - -def render(d, x, y, z, primary, secondary, parameter): - fig, ax = plt.subplots() - - ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) - - units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[ - "sst"] - ax.set_ylabel("%s %s" % (secondary, units)) - ax.set_xlabel("%s %s" % (primary, units)) - - ax.scatter(x, y) - - sio = StringIO() - plt.savefig(sio, format='png') - d['plot'] = sio.getvalue() - - -class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults): - - def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, - computeOptions=None, executionId=None, plot=None): - BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds, - count=count, computeOptions=computeOptions, executionId=executionId) - self.__primary = primary - self.__secondary = secondary - self.__x = x - self.__y = y - self.__z = z - self.__parameter = parameter - self.__plot = plot - - def toImage(self): - return self.__plot - - -def renderAsync(x, y, z, primary, secondary, parameter): - manager = Manager() - d = manager.dict() - p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) - p.start() - p.join() - return d['plot'] - - -def createScatterPlot(id, parameter): - with ResultsStorage.ResultsRetrieval() as storage: - params, stats, data = storage.retrieveResults(id) - - primary = params["primary"] - secondary = params["matchup"][0] - - x, y, z = createScatterTable(data, secondary, parameter) - - plot = renderAsync(x, y, z, primary, secondary, parameter) - - r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary, - args=params, details=stats, - bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) - return r - - -def createScatterTable(results, secondary, parameter): - x = [] - y = [] - z = [] - - field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] - - for entry in results: - for match in entry["matches"]: - if match["source"] == secondary: - if field in entry and field in match: - a = entry[field] - b = match[field] - x.append(a) - y.append(b) - z.append(a - b) - - return x, y, z diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py deleted file mode 100644 index 67a2276..0000000 --- a/analysis/webservice/algorithms/doms/subsetter.py +++ /dev/null @@ -1,260 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import os -import tempfile -import zipfile -from datetime import datetime - -import requests - -import BaseDomsHandler -from webservice.NexusHandler import nexus_handler -from webservice.webmodel import NexusProcessingException - -ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' - - -def is_blank(my_string): - return not (my_string and my_string.strip() != '') - - -@nexus_handler -class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): - name = "DOMS Subsetter" - path = "/domssubset" - description = "Subset DOMS sources given the search domain" - - params = { - "dataset": { - "name": "NEXUS Dataset", - "type": "string", - "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required" - }, - "insitu": { - "name": "In Situ sources", - "type": "comma-delimited string", - "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required" - }, - "parameter": { - "name": "Data Parameter", - "type": "string", - "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required" - }, - "startTime": { - "name": "Start Time", - "type": "string", - "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" - }, - "endTime": { - "name": "End Time", - "type": "string", - "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" - }, - "b": { - "name": "Bounding box", - "type": "comma-delimited float", - "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " - "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" - }, - "depthMin": { - "name": "Minimum Depth", - "type": "float", - "description": "Minimum depth of measurements. Must be less than depthMax. Optional" - }, - "depthMax": { - "name": "Maximum Depth", - "type": "float", - "description": "Maximum depth of measurements. Must be greater than depthMin. Optional" - }, - "platforms": { - "name": "Platforms", - "type": "comma-delimited integer", - "description": "Platforms to include for subset consideration. Optional" - }, - "output": { - "name": "Output", - "type": "string", - "description": "Output type. Only 'ZIP' is currently supported. Required" - } - } - singleton = True - - def __init__(self): - BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - - def parse_arguments(self, request): - # Parse input arguments - self.log.debug("Parsing arguments") - - primary_ds_name = request.get_argument('dataset', None) - matchup_ds_names = request.get_argument('insitu', None) - - if is_blank(primary_ds_name) and is_blank(matchup_ds_names): - raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required", - code=400) - - if matchup_ds_names is not None: - try: - matchup_ds_names = matchup_ds_names.split(',') - except: - raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400) - - parameter_s = request.get_argument('parameter', None) - if parameter_s not in ['sst', 'sss', 'wind']: - raise NexusProcessingException( - reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) - - try: - start_time = request.get_start_datetime() - start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") - except: - raise NexusProcessingException( - reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - try: - end_time = request.get_end_datetime() - end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") - except: - raise NexusProcessingException( - reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - - if start_time > end_time: - raise NexusProcessingException( - reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( - request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), - code=400) - - try: - bounding_polygon = request.get_bounding_polygon() - except: - raise NexusProcessingException( - reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", - code=400) - - depth_min = request.get_decimal_arg('depthMin', default=None) - depth_max = request.get_decimal_arg('depthMax', default=None) - - if depth_min is not None and depth_max is not None and depth_min >= depth_max: - raise NexusProcessingException( - reason="Depth Min should be less than Depth Max", code=400) - - platforms = request.get_argument('platforms', None) - if platforms is not None: - try: - p_validation = platforms.split(',') - p_validation = [int(p) for p in p_validation] - del p_validation - except: - raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) - - return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ - bounding_polygon, depth_min, depth_max, platforms - - def calc(self, request, **args): - - primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ - bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request) - - primary_url = "https://doms.jpl.nasa.gov/datainbounds" - primary_params = { - 'ds': primary_ds_name, - 'parameter': parameter_s, - 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), - 'startTime': start_time, - 'endTime': end_time, - 'output': "CSV" - } - - matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset" - matchup_params = { - 'source': None, - 'parameter': parameter_s, - 'startTime': start_time, - 'endTime': end_time, - 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), - 'depthMin': depth_min, - 'depthMax': depth_max, - 'platforms': platforms, - 'output': 'CSV' - } - - primary_temp_file_path = None - matchup_downloads = None - - with requests.session() as session: - - if not is_blank(primary_ds_name): - # Download primary - primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv') - download_file(primary_url, primary_temp_file_path, session, params=primary_params) - - if len(matchup_ds_names) > 0: - # Download matchup - matchup_downloads = {} - for matchup_ds in matchup_ds_names: - matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv') - matchup_params['source'] = matchup_ds - download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params) - - # Zip downloads - date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"), - datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d")) - bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds - zip_dir = tempfile.mkdtemp() - zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds) - with zipfile.ZipFile(zip_path, 'w') as my_zip: - if primary_temp_file_path: - my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds)) - if matchup_downloads: - for matchup_ds, download in matchup_downloads.iteritems(): - my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds)) - - # Clean up - if primary_temp_file_path: - os.remove(primary_temp_file_path) - if matchup_downloads: - for matchup_ds, download in matchup_downloads.iteritems(): - os.remove(download[1]) - - return SubsetResult(zip_path) - - -class SubsetResult(object): - def __init__(self, zip_path): - self.zip_path = zip_path - - def toJson(self): - raise NotImplementedError - - def toZip(self): - with open(self.zip_path, 'rb') as zip_file: - zip_contents = zip_file.read() - - return zip_contents - - def cleanup(self): - os.remove(self.zip_path) - - -def download_file(url, filepath, session, params=None): - r = session.get(url, params=params, stream=True) - with open(filepath, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: # filter out keep-alive new chunks - f.write(chunk) diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py deleted file mode 100644 index c47d450..0000000 --- a/analysis/webservice/algorithms/doms/values.py +++ /dev/null @@ -1,72 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -PLATFORMS = [ - {"id": 1, "desc": "ship"}, - {"id": 2, "desc": "moored surface buoy"}, - {"id": 3, "desc": "drifting surface float"}, - {"id": 4, "desc": "drifting subsurface profiling float"}, - {"id": 5, "desc": "autonomous underwater vehicle"}, - {"id": 6, "desc": "offshore structure"}, - {"id": 7, "desc": "coastal structure"}, - {"id": 8, "desc": "towed unmanned submersible"}, - {"id": 9, "desc": "orbiting satellite"} -] - -DEVICES = [ - {"id": 1, "desc": "bathythermographs"}, - {"id": 2, "desc": "discrete water samplers"}, - {"id": 3, "desc": "CTD"}, - {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"}, - {"id": 5, "desc": "radiometers"}, - {"id": 6, "desc": "scatterometers"} -] - -MISSIONS = [ - {"id": 1, "desc": "SAMOS"}, - {"id": 2, "desc": "ICOADS"}, - {"id": 3, "desc": "Aquarius"}, - {"id": 4, "desc": "SPURS1"} -] - - -def getDescById(list, id): - for item in list: - if item["id"] == id: - return item["desc"] - return id - - -def getPlatformById(id): - return getDescById(PLATFORMS, id) - - -def getDeviceById(id): - return getDescById(DEVICES, id) - - -def getMissionById(id): - return getDescById(MISSIONS, id) - - -def getDescByListNameAndId(listName, id): - if listName.upper() == "PLATFORM": - return getPlatformById(id) - elif listName.upper() == "DEVICE": - return getDeviceById(id) - elif listName.upper() == "MISSION": - return getMissionById(id) - else: - raise Exception("Invalid list name specified ('%s')" % listName) diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py deleted file mode 100644 index 7639c00..0000000 --- a/analysis/webservice/algorithms/doms/workerthread.py +++ /dev/null @@ -1,61 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import threading - - -class WorkerThread(threading.Thread): - - def __init__(self, method, params): - threading.Thread.__init__(self) - self.method = method - self.params = params - self.completed = False - self.results = None - - def run(self): - self.results = self.method(*self.params) - self.completed = True - - -def __areAllComplete(threads): - for thread in threads: - if not thread.completed: - return False - - return True - - -def wait(threads, startFirst=False, poll=0.5): - if startFirst: - for thread in threads: - thread.start() - - while not __areAllComplete(threads): - threading._sleep(poll) - - -def foo(param1, param2): - print param1, param2 - return "c" - - -if __name__ == "__main__": - - thread = WorkerThread(foo, params=("a", "b")) - thread.start() - while not thread.completed: - threading._sleep(0.5) - print thread.results diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py deleted file mode 100644 index 9ae7557..0000000 --- a/analysis/webservice/algorithms_spark/Matchup.py +++ /dev/null @@ -1,703 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - - -import json -import logging -import threading -from datetime import datetime -from itertools import chain -from math import cos, radians - -import numpy as np -import pyproj -import requests -from nexustiles.nexustiles import NexusTileService -from pytz import timezone, UTC -from scipy import spatial -from shapely import wkt -from shapely.geometry import Point -from shapely.geometry import box -from shapely.geos import WKTReadingError - -from webservice.NexusHandler import nexus_handler -from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler -from webservice.algorithms.doms import config as edge_endpoints -from webservice.algorithms.doms import values as doms_values -from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults -from webservice.algorithms.doms.ResultsStorage import ResultsStorage -from webservice.webmodel import NexusProcessingException - -EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) -ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' - - -def iso_time_to_epoch(str_time): - return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace( - tzinfo=UTC) - EPOCH).total_seconds() - - -@nexus_handler -class Matchup(NexusCalcSparkHandler): - name = "Matchup" - path = "/match_spark" - description = "Match measurements between two or more datasets" - - params = { - "primary": { - "name": "Primary Dataset", - "type": "string", - "description": "The Primary dataset used to find matches for. Required" - }, - "matchup": { - "name": "Match-Up Datasets", - "type": "comma-delimited string", - "description": "The Dataset(s) being searched for measurements that match the Primary. Required" - }, - "parameter": { - "name": "Match-Up Parameter", - "type": "string", - "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required" - }, - "startTime": { - "name": "Start Time", - "type": "string", - "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" - }, - "endTime": { - "name": "End Time", - "type": "string", - "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" - }, - "b": { - "name": "Bounding box", - "type": "comma-delimited float", - "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " - "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" - }, - "depthMin": { - "name": "Minimum Depth", - "type": "float", - "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit" - }, - "depthMax": { - "name": "Maximum Depth", - "type": "float", - "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit" - }, - "tt": { - "name": "Time Tolerance", - "type": "long", - "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400" - }, - "rt": { - "name": "Radius Tolerance", - "type": "float", - "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000" - }, - "platforms": { - "name": "Platforms", - "type": "comma-delimited integer", - "description": "Platforms to include for matchup consideration. Required" - }, - "matchOnce": { - "name": "Match Once", - "type": "boolean", - "description": "Optional True/False flag used to determine if more than one match per primary point is returned. " - + "If true, only the nearest point will be returned for each primary point. " - + "If false, all points within the tolerances will be returned for each primary point. Default: False" - }, - "resultSizeLimit": { - "name": "Result Size Limit", - "type": "int", - "description": "Optional integer value that limits the number of results returned from the matchup. " - "If the number of primary matches is greater than this limit, the service will respond with " - "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. " - "Default: 500" - } - } - singleton = True - - def __init__(self, algorithm_config=None, sc=None): - NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True) - self.log = logging.getLogger(__name__) - - def parse_arguments(self, request): - # Parse input arguments - self.log.debug("Parsing arguments") - try: - bounding_polygon = request.get_bounding_polygon() - except: - raise NexusProcessingException( - reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", - code=400) - primary_ds_name = request.get_argument('primary', None) - if primary_ds_name is None: - raise NexusProcessingException(reason="'primary' argument is required", code=400) - matchup_ds_names = request.get_argument('matchup', None) - if matchup_ds_names is None: - raise NexusProcessingException(reason="'matchup' argument is required", code=400) - - parameter_s = request.get_argument('parameter', 'sst') - if parameter_s not in ['sst', 'sss', 'wind']: - raise NexusProcessingException( - reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) - - try: - start_time = request.get_start_datetime() - except: - raise NexusProcessingException( - reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - try: - end_time = request.get_end_datetime() - except: - raise NexusProcessingException( - reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", - code=400) - - if start_time > end_time: - raise NexusProcessingException( - reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( - request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), - code=400) - - depth_min = request.get_decimal_arg('depthMin', default=None) - depth_max = request.get_decimal_arg('depthMax', default=None) - - if depth_min is not None and depth_max is not None and depth_min >= depth_max: - raise NexusProcessingException( - reason="Depth Min should be less than Depth Max", code=400) - - time_tolerance = request.get_int_arg('tt', default=86400) - radius_tolerance = request.get_decimal_arg('rt', default=1000.0) - platforms = request.get_argument('platforms', None) - if platforms is None: - raise NexusProcessingException(reason="'platforms' argument is required", code=400) - try: - p_validation = platforms.split(',') - p_validation = [int(p) for p in p_validation] - del p_validation - except: - raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) - - match_once = request.get_boolean_arg("matchOnce", default=False) - - result_size_limit = request.get_int_arg("resultSizeLimit", default=500) - - start_seconds_from_epoch = long((start_time - EPOCH).total_seconds()) - end_seconds_from_epoch = long((end_time - EPOCH).total_seconds()) - - return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ - start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ - depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit - - def calc(self, request, **args): - start = datetime.utcnow() - # TODO Assuming Satellite primary - bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \ - start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \ - depth_min, depth_max, time_tolerance, radius_tolerance, \ - platforms, match_once, result_size_limit = self.parse_arguments(request) - - with ResultsStorage() as resultsStorage: - - execution_id = str(resultsStorage.insertExecution(None, start, None, None)) - - self.log.debug("Querying for tiles in search domain") - # Get tile ids in box - tile_ids = [tile.tile_id for tile in - self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name, - start_seconds_from_epoch, end_seconds_from_epoch, - fetch_data=False, fl='id', - sort=['tile_min_time_dt asc', 'tile_min_lon asc', - 'tile_min_lat asc'], rows=5000)] - - # Call spark_matchup - self.log.debug("Calling Spark Driver") - try: - spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name, - matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance, - radius_tolerance, platforms, match_once, sc=self._sc) - except Exception as e: - self.log.exception(e) - raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500) - - end = datetime.utcnow() - - self.log.debug("Building and saving results") - args = { - "primary": primary_ds_name, - "matchup": matchup_ds_names, - "startTime": start_time, - "endTime": end_time, - "bbox": request.get_argument('b'), - "timeTolerance": time_tolerance, - "radiusTolerance": float(radius_tolerance), - "platforms": platforms, - "parameter": parameter_s - } - - if depth_min is not None: - args["depthMin"] = float(depth_min) - - if depth_max is not None: - args["depthMax"] = float(depth_max) - - total_keys = len(spark_result.keys()) - total_values = sum(len(v) for v in spark_result.itervalues()) - details = { - "timeToComplete": int((end - start).total_seconds()), - "numInSituRecords": 0, - "numInSituMatched": total_values, - "numGriddedChecked": 0, - "numGriddedMatched": total_keys - } - - matches = Matchup.convert_to_matches(spark_result) - - def do_result_insert(): - with ResultsStorage() as storage: - storage.insertResults(results=matches, params=args, stats=details, - startTime=start, completeTime=end, userEmail="", - execution_id=execution_id) - - threading.Thread(target=do_result_insert).start() - - if 0 < result_size_limit < len(matches): - result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None, - computeOptions=None, executionId=execution_id, status_code=202) - else: - result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, - computeOptions=None, executionId=execution_id) - - return result - - @classmethod - def convert_to_matches(cls, spark_result): - matches = [] - for primary_domspoint, matched_domspoints in spark_result.iteritems(): - p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints] - - primary = cls.domspoint_to_dict(primary_domspoint) - primary['matches'] = list(p_matched) - matches.append(primary) - return matches - - @staticmethod - def domspoint_to_dict(domspoint): - return { - "sea_water_temperature": domspoint.sst, - "sea_water_temperature_depth": domspoint.sst_depth, - "sea_water_salinity": domspoint.sss, - "sea_water_salinity_depth": domspoint.sss_depth, - "wind_speed": domspoint.wind_speed, - "wind_direction": domspoint.wind_direction, - "wind_u": domspoint.wind_u, - "wind_v": domspoint.wind_v, - "platform": doms_values.getPlatformById(domspoint.platform), - "device": doms_values.getDeviceById(domspoint.device), - "x": str(domspoint.longitude), - "y": str(domspoint.latitude), - "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude), - "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC), - "fileurl": domspoint.file_url, - "id": domspoint.data_id, - "source": domspoint.source, - } - - -class DomsPoint(object): - def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None): - - self.time = time - self.longitude = longitude - self.latitude = latitude - self.depth = depth - self.data_id = data_id - - self.wind_u = None - self.wind_v = None - self.wind_direction = None - self.wind_speed = None - self.sst = None - self.sst_depth = None - self.sss = None - self.sss_depth = None - self.source = None - self.depth = None - self.platform = None - self.device = None - self.file_url = None - - def __repr__(self): - return str(self.__dict__) - - @staticmethod - def from_nexus_point(nexus_point, tile=None, parameter='sst'): - point = DomsPoint() - - point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index) - - # TODO Not an ideal solution; but it works for now. - if parameter == 'sst': - point.sst = nexus_point.data_val.item() - elif parameter == 'sss': - point.sss = nexus_point.data_val.item() - elif parameter == 'wind': - point.wind_u = nexus_point.data_val.item() - try: - point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item() - except (KeyError, IndexError): - pass - try: - point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item() - except (KeyError, IndexError): - pass - try: - point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item() - except (KeyError, IndexError): - pass - else: - raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter) - - point.longitude = nexus_point.longitude.item() - point.latitude = nexus_point.latitude.item() - - point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ') - - try: - point.depth = nexus_point.depth - except KeyError: - # No depth associated with this measurement - pass - - point.sst_depth = 0 - point.source = tile.dataset - point.file_url = tile.granule - - # TODO device should change based on the satellite making the observations. - point.platform = 9 - point.device = 5 - return point - - @staticmethod - def from_edge_point(edge_point): - point = DomsPoint() - - try: - x, y = wkt.loads(edge_point['point']).coords[0] - except WKTReadingError: - try: - x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] - except ValueError: - y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] - - point.longitude = x - point.latitude = y - - point.time = edge_point['time'] - - point.wind_u = edge_point.get('eastward_wind') - point.wind_v = edge_point.get('northward_wind') - point.wind_direction = edge_point.get('wind_direction') - point.wind_speed = edge_point.get('wind_speed') - point.sst = edge_point.get('sea_water_temperature') - point.sst_depth = edge_point.get('sea_water_temperature_depth') - point.sss = edge_point.get('sea_water_salinity') - point.sss_depth = edge_point.get('sea_water_salinity_depth') - point.source = edge_point.get('source') - point.platform = edge_point.get('platform') - point.device = edge_point.get('device') - point.file_url = edge_point.get('fileurl') - - try: - point.data_id = unicode(edge_point['id']) - except KeyError: - point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude) - - return point - - -from threading import Lock - -DRIVER_LOCK = Lock() - - -def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max, - time_tolerance, radius_tolerance, platforms, match_once, sc=None): - from functools import partial - - with DRIVER_LOCK: - # Broadcast parameters - primary_b = sc.broadcast(primary_ds_name) - matchup_b = sc.broadcast(matchup_ds_names) - depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None) - depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None) - tt_b = sc.broadcast(time_tolerance) - rt_b = sc.broadcast(float(radius_tolerance)) - platforms_b = sc.broadcast(platforms) - bounding_wkt_b = sc.broadcast(bounding_wkt) - parameter_b = sc.broadcast(parameter) - - # Parallelize list of tile ids - rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids))) - - # Map Partitions ( list(tile_id) ) - rdd_filtered = rdd.mapPartitions( - partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b, - rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b, - depth_max_b=depth_max_b), preservesPartitioning=True) \ - .filter(lambda p_m_tuple: abs( - iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance) - - if match_once: - # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates - # the distance and finds the minimum - - # Method used for calculating the distance between 2 DomsPoints - from pyproj import Geod - - def dist(primary, matchup): - wgs84_geod = Geod(ellps='WGS84') - lat1, lon1 = (primary.latitude, primary.longitude) - lat2, lon2 = (matchup.latitude, matchup.longitude) - az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2) - return distance - - rdd_filtered = rdd_filtered \ - .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \ - .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \ - .mapValues(lambda x: [x[0]]) - else: - rdd_filtered = rdd_filtered \ - .combineByKey(lambda value: [value], # Create 1 element list - lambda value_list, value: value_list + [value], # Add 1 element to list - lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together - - result_as_map = rdd_filtered.collectAsMap() - - return result_as_map - - -def determine_parllelism(num_tiles): - """ - Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions. - Also, don't go below the default of 8 - """ - num_partitions = max(min(num_tiles / 140, 128), 8) - return num_partitions - - -def add_meters_to_lon_lat(lon, lat, meters): - """ - Uses a simple approximation of - 1 degree latitude = 111,111 meters - 1 degree longitude = 111,111 meters * cosine(latitude) - :param lon: longitude to add meters to - :param lat: latitude to add meters to - :param meters: meters to add to the longitude and latitude values - :return: (longitude, latitude) increased by given meters - """ - longitude = lon + ((meters / 111111) * cos(radians(lat))) - latitude = lat + (meters / 111111) - - return longitude, latitude - - -def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b, - bounding_wkt_b, depth_min_b, depth_max_b): - the_time = datetime.now() - tile_ids = list(tile_ids) - if len(tile_ids) == 0: - return [] - tile_service = NexusTileService() - - # Determine the spatial temporal extents of this partition of tiles - tiles_bbox = tile_service.get_bounding_box(tile_ids) - tiles_min_time = tile_service.get_min_time(tile_ids) - tiles_max_time = tile_service.get_max_time(tile_ids) - - # Increase spatial extents by the radius tolerance - matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1], - -1 * rt_b.value) - matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value) - - # Don't go outside of the search domain - search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds - matchup_min_lon = max(matchup_min_lon, search_min_x) - matchup_min_lat = max(matchup_min_lat, search_min_y) - matchup_max_lon = min(matchup_max_lon, search_max_x) - matchup_max_lat = min(matchup_max_lat, search_max_y) - - # Find the centroid of the matchup bounding box and initialize the projections - matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0] - aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1]) - lonlat_proj = pyproj.Proj(proj='lonlat') - - # Increase temporal extents by the time tolerance - matchup_min_time = tiles_min_time - tt_b.value - matchup_max_time = tiles_max_time + tt_b.value - print "%s Time to determine spatial-temporal extents for partition %s to %s" % ( - str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) - - # Query edge for all points within the spatial-temporal extents of this partition - the_time = datetime.now() - edge_session = requests.Session() - edge_results = [] - with edge_session: - for insitudata_name in matchup_b.value.split(','): - bbox = ','.join( - [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)]) - edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox, - platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session) - if edge_response['totalResults'] == 0: - continue - r = edge_response['results'] - for p in r: - p['source'] = insitudata_name - edge_results.extend(r) - print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) - if len(edge_results) == 0: - return [] - - # Convert edge points to utm - the_time = datetime.now() - matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32) - for n, edge_point in enumerate(edge_results): - try: - x, y = wkt.loads(edge_point['point']).coords[0] - except WKTReadingError: - try: - x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0] - except ValueError: - y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0] - - matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y) - print "%s Time to convert match points for partition %s to %s" % ( - str(datetime.now() - the_time), tile_ids[0], tile_ids[-1]) - - # Build kdtree from matchup points - the_time = datetime.now() - m_tree = spatial.cKDTree(matchup_points, leafsize=30) - print "%s Time to build matchup tree" % (str(datetime.now() - the_time)) - - # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time - match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value, - parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id - in tile_ids] - - return chain(*match_generators) - - -def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt, - search_parameter, radius_tolerance, lonlat_proj, aeqd_proj): - from nexustiles.model.nexusmodel import NexusPoint - from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains - - # Load tile - try: - the_time = datetime.now() - tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt), - tile_service.find_tile_by_id(tile_id))[0] - print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id) - except IndexError: - # This should only happen if all measurements in a tile become masked after applying the bounding polygon - raise StopIteration - - # Convert valid tile lat,lon tuples to UTM tuples - the_time = datetime.now() - # Get list of indices of valid values - valid_indices = tile.get_indices() - primary_points = np.array( - [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for - aslice in valid_indices]) - - print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id) - - a_time = datetime.now() - p_tree = spatial.cKDTree(primary_points, leafsize=30) - print "%s Time to build primary tree" % (str(datetime.now() - a_time)) - - a_time = datetime.now() - matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance) - print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id) - for i, point_matches in enumerate(matched_indexes): - if len(point_matches) > 0: - p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]], - tile.longitudes[valid_indices[i][2]], None, - tile.times[valid_indices[i][0]], valid_indices[i], - tile.data[tuple(valid_indices[i])]) - p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter) - for m_point_index in point_matches: - m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index]) - yield p_doms_point, m_doms_point - - -def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000, - startIndex=0, stats=True, session=None): - try: - startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') - except TypeError: - # Assume we were passed a properly formatted string - pass - - try: - endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') - except TypeError: - # Assume we were passed a properly formatted string - pass - - try: - platform = platform.split(',') - except AttributeError: - # Assume we were passed a list - pass - - params = {"variable": variable, - "startTime": startTime, - "endTime": endTime, - "bbox": bbox, - "minDepth": depth_min, - "maxDepth": depth_max, - "platform": platform, - "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} - - if session is not None: - edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) - else: - edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) - - edge_request.raise_for_status() - edge_response = json.loads(edge_request.text) - - # Get all edge results - next_page_url = edge_response.get('next', None) - while next_page_url is not None: - if session is not None: - edge_page_request = session.get(next_page_url) - else: - edge_page_request = requests.get(next_page_url) - - edge_page_request.raise_for_status() - edge_page_response = json.loads(edge_page_request.text) - - edge_response['results'].extend(edge_page_response['results']) - - next_page_url = edge_page_response.get('next', None) - - return edge_response diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py index 12b84c1..9e77887 100644 --- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py +++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py @@ -6,6 +6,7 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.metrics import MetricsRecord, SparkAccumulatorMetricsField, NumberMetricsField from webservice.webmodel import NexusProcessingException +logger = logging.getLogger(__name__) class NexusCalcSparkHandler(NexusCalcHandler): class SparkJobContext(object): diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py index d6ed83f..a25c8d5 100644 --- a/analysis/webservice/algorithms_spark/__init__.py +++ b/analysis/webservice/algorithms_spark/__init__.py @@ -20,7 +20,6 @@ import ClimMapSpark import CorrMapSpark import DailyDifferenceAverageSpark import HofMoellerSpark -import Matchup import MaximaMinimaSpark import NexusCalcSparkHandler import TimeAvgMapSpark @@ -47,11 +46,6 @@ if module_exists("pyspark"): pass try: - import Matchup - except ImportError: - pass - - try: import TimeAvgMapSpark except ImportError: pass diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini index 2644ade..a1ecb2c 100644 --- a/analysis/webservice/config/web.ini +++ b/analysis/webservice/config/web.ini @@ -14,4 +14,4 @@ static_enabled=true static_dir=static [modules] -module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms \ No newline at end of file +module_dirs=webservice.algorithms,webservice.algorithms_spark \ No newline at end of file diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py index ad7a773..c8a6852 100644 --- a/analysis/webservice/webapp.py +++ b/analysis/webservice/webapp.py @@ -69,6 +69,7 @@ if __name__ == "__main__": help='time out for solr requests in seconds, default (60) is ok for most deployments' ' when solr performances are not good this might need to be increased') define('solr_host', help='solr host and port') + define('cassandra_host', help='cassandra host') parse_command_line() algorithm_config = inject_args_in_config(options, algorithm_config) @@ -114,7 +115,7 @@ if __name__ == "__main__": else: handlers.append( (clazzWrapper.path, NexusRequestHandler, - dict(clazz=clazzWrapper, thread_pool=request_thread_pool))) + dict(clazz=clazzWrapper, algorithm_config=algorithm_config, thread_pool=request_thread_pool))) class VersionHandler(tornado.web.RequestHandler): diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default index 0fe8d9d..e4ab6c1 100644 --- a/data-access/nexustiles/config/datastores.ini.default +++ b/data-access/nexustiles/config/datastores.ini.default @@ -1,5 +1,5 @@ [cassandra] -host=sdap-cassandra +host=localhost port=9042 keyspace=nexustiles local_datacenter=datacenter1 @@ -15,7 +15,7 @@ table=nexus-jpl-table region=us-west-2 [solr] -host=sdap-solr:8983 +host=localhost:8983 core=nexustiles [datastore] diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 24db1ae..3e7e2f8 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -102,10 +102,10 @@ class NexusTileService(object): def override_config(self, config): for section in config.sections(): - if self._config.has_section(section): # only override preexisting section, ignores the other + if self._config.has_section(section): # only override preexisting section, ignores the other for option in config.options(section): - self._config.set(section, option, config.get(section, option)) - + if config.get(section, option) is not None: + self._config.set(section, option, config.get(section, option)) def get_dataseries_list(self, simple=False): if simple: diff --git a/helm/templates/webapp.yml b/helm/templates/webapp.yml index e363ab7..4af0e38 100644 --- a/helm/templates/webapp.yml +++ b/helm/templates/webapp.yml @@ -9,7 +9,7 @@ spec: pythonVersion: "2" mode: cluster image: {{ .Values.webapp.distributed.image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent mainApplicationFile: local:///incubator-sdap-nexus/analysis/webservice/webapp.py arguments: - "--solr-host={{ .Release.Name }}-solr-svc:8983" diff --git a/tools/doms/README.md b/tools/doms/README.md deleted file mode 100644 index c49fa4a..0000000 --- a/tools/doms/README.md +++ /dev/null @@ -1,66 +0,0 @@ -# doms_reader.py -The functions in doms_reader.py read a DOMS netCDF file into memory, assemble a list of matches of satellite and in situ data, and optionally output the matches to a CSV file. Each matched pair contains one satellite data record and one in situ data record. - -The DOMS netCDF files hold satellite data and in situ data in different groups (`SatelliteData` and `InsituData`). The `matchIDs` netCDF variable contains pairs of IDs (matches) which reference a satellite data record and an in situ data record in their respective groups. These records have a many-to-many relationship; one satellite record may match to many in situ records, and one in situ record may match to many satellite records. The `assemble_matches` function assembles the individua [...] - -## Requirements -This tool was developed and tested with Python 2.7.5 and 3.7.0a0. -Imported packages: -* argparse -* netcdf4 -* sys -* datetime -* csv -* collections -* logging - - -## Functions -### Function: `assemble_matches(filename)` -Read a DOMS netCDF file into memory and return a list of matches from the file. - -#### Parameters -- `filename` (str): the DOMS netCDF file name. - -#### Returns -- `matches` (list): List of matches. - -Each list element in `matches` is a dictionary organized as follows: - For match `m`, netCDF group `GROUP` ('SatelliteData' or 'InsituData'), and netCDF group variable `VARIABLE`: - -`matches[m][GROUP]['matchID']`: netCDF `MatchedRecords` dimension ID for the match -`matches[m][GROUP]['GROUPID']`: GROUP netCDF `dim` dimension ID for the record -`matches[m][GROUP][VARIABLE]`: variable value - -For example, to access the timestamps of the satellite data and the in situ data of the first match in the list, along with the `MatchedRecords` dimension ID and the groups' `dim` dimension ID: -```python -matches[0]['SatelliteData']['time'] -matches[0]['InsituData']['time'] -matches[0]['SatelliteData']['matchID'] -matches[0]['SatelliteData']['SatelliteDataID'] -matches[0]['InsituData']['InsituDataID'] -``` - - -### Function: `matches_to_csv(matches, csvfile)` -Write the DOMS matches to a CSV file. Include a header of column names which are based on the group and variable names from the netCDF file. - -#### Parameters: -- `matches` (list): the list of dictionaries containing the DOMS matches as returned from the `assemble_matches` function. -- `csvfile` (str): the name of the CSV output file. - -## Usage -For example, to read some DOMS netCDF file called `doms_file.nc`: -### Command line -The main function for `doms_reader.py` takes one `filename` parameter (`doms_file.nc` argument in this example) for the DOMS netCDF file to read, calls the `assemble_matches` function, then calls the `matches_to_csv` function to write the matches to a CSV file `doms_matches.csv`. -``` -python doms_reader.py doms_file.nc -``` -``` -python3 doms_reader.py doms_file.nc -``` -### Importing `assemble_matches` -```python -from doms_reader import assemble_matches -matches = assemble_matches('doms_file.nc') -``` diff --git a/tools/doms/doms_reader.py b/tools/doms/doms_reader.py deleted file mode 100644 index c8229c4..0000000 --- a/tools/doms/doms_reader.py +++ /dev/null @@ -1,144 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse -from netCDF4 import Dataset, num2date -import sys -import datetime -import csv -from collections import OrderedDict -import logging - -LOGGER = logging.getLogger("doms_reader") - -def assemble_matches(filename): - """ - Read a DOMS netCDF file and return a list of matches. - - Parameters - ---------- - filename : str - The DOMS netCDF file name. - - Returns - ------- - matches : list - List of matches. Each list element is a dictionary. - For match m, netCDF group GROUP (SatelliteData or InsituData), and - group variable VARIABLE: - matches[m][GROUP]['matchID']: MatchedRecords dimension ID for the match - matches[m][GROUP]['GROUPID']: GROUP dim dimension ID for the record - matches[m][GROUP][VARIABLE]: variable value - """ - - try: - # Open the netCDF file - with Dataset(filename, 'r') as doms_nc: - # Check that the number of groups is consistent w/ the MatchedGroups - # dimension - assert len(doms_nc.groups) == doms_nc.dimensions['MatchedGroups'].size,\ - ("Number of groups isn't the same as MatchedGroups dimension.") - - matches = [] - matched_records = doms_nc.dimensions['MatchedRecords'].size - - # Loop through the match IDs to assemble matches - for match in range(0, matched_records): - match_dict = OrderedDict() - # Grab the data from each platform (group) in the match - for group_num, group in enumerate(doms_nc.groups): - match_dict[group] = OrderedDict() - match_dict[group]['matchID'] = match - ID = doms_nc.variables['matchIDs'][match][group_num] - match_dict[group][group + 'ID'] = ID - for var in doms_nc.groups[group].variables.keys(): - match_dict[group][var] = doms_nc.groups[group][var][ID] - - # Create a UTC datetime field from timestamp - dt = num2date(match_dict[group]['time'], - doms_nc.groups[group]['time'].units) - match_dict[group]['datetime'] = dt - LOGGER.info(match_dict) - matches.append(match_dict) - - return matches - except (OSError, IOError) as err: - LOGGER.exception("Error reading netCDF file " + filename) - raise err - -def matches_to_csv(matches, csvfile): - """ - Write the DOMS matches to a CSV file. Include a header of column names - which are based on the group and variable names from the netCDF file. - - Parameters - ---------- - matches : list - The list of dictionaries containing the DOMS matches as returned from - assemble_matches. - csvfile : str - The name of the CSV output file. - """ - # Create a header for the CSV. Column names are GROUP_VARIABLE or - # GROUP_GROUPID. - header = [] - for key, value in matches[0].items(): - for otherkey in value.keys(): - header.append(key + "_" + otherkey) - - try: - # Write the CSV file - with open(csvfile, 'w') as output_file: - csv_writer = csv.writer(output_file) - csv_writer.writerow(header) - for match in matches: - row = [] - for group, data in match.items(): - for value in data.values(): - row.append(value) - csv_writer.writerow(row) - except (OSError, IOError) as err: - LOGGER.exception("Error writing CSV file " + csvfile) - raise err - -if __name__ == '__main__': - """ - Execution: - python doms_reader.py filename - OR - python3 doms_reader.py filename - """ - logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', - level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S') - - p = argparse.ArgumentParser() - p.add_argument('filename', help='DOMS netCDF file to read') - args = p.parse_args() - - doms_matches = assemble_matches(args.filename) - - matches_to_csv(doms_matches, 'doms_matches.csv') - - - - - - - - - - - \ No newline at end of file