sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eamonf...@apache.org
Subject [incubator-sdap-nexus] branch date-normalization updated: Add normalizeDates option to Hofmueller
Date Thu, 24 Dec 2020 00:08:54 GMT
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch date-normalization
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/date-normalization by this push:
     new 38666fb  Add normalizeDates option to Hofmueller
38666fb is described below

commit 38666fbee8e8cdc114cdb38fdf11ed0994d1ecd0
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Wed Dec 23 16:08:38 2020 -0800

    Add normalizeDates option to Hofmueller
---
 .../webservice/algorithms_spark/HofMoellerSpark.py | 74 ++++++++++++----------
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 12 +---
 analysis/webservice/algorithms_spark/utils.py      |  9 +++
 3 files changed, 52 insertions(+), 43 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 7c3041a..f65b25b 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -28,6 +28,7 @@ from pytz import timezone
 
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -159,7 +160,7 @@ class BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
         except:
             try:
                 west, south, east, north = request.get_min_lon(), request.get_min_lat(),
\
-                                           request.get_max_lon(), request.get_max_lat()
+                    request.get_max_lon(), request.get_max_lat()
                 bounding_polygon = shapely.geometry.Polygon(
                     [(west, south), (east, south), (east, north), (west, north), (west, south)])
             except:
@@ -192,8 +193,9 @@ class BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
 
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+        normalize_dates = request.get_normalize_dates()
 
-        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+        return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, normalize_dates
 
     def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="mean", append=True):
         shape = (len(results), len(results[0][pivot]))
@@ -262,7 +264,7 @@ def hof_tuple_to_dict(t, avg_var_name):
             'min': t[7]}
 
 
-def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
     # Parallelize list of tile ids
     rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
     if latlon == 0:
@@ -291,34 +293,38 @@ def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark,
metrics_ca
                                                  hof_tuple_combine(x[1],
                                                                    y[1])))
 
+    # The functions create_combiner, merge_value, and merge_combiner are arguments to RDD.combineByKey()
+    def create_combiner(val):
+        time_in_seconds = val[0]
+        if normalize_dates:
+            time_in_seconds = utils.normalize_date(val[0])
+
+        return {
+            'sequence': val[1],
+            'time': time_in_seconds,
+            'iso_time': datetime.utcfromtimestamp(time_in_seconds).strftime(ISO_8601),
+            avg_var_name_collection: [hof_tuple_to_dict(val, avg_var_name)]
+        }
+
+    def merge_value(x, val):
+        return {
+            'sequence': x['sequence'],
+            'time': x['time'],
+            'iso_time': x['iso_time'],
+            avg_var_name_collection: (x[avg_var_name_collection] + [hof_tuple_to_dict(val,
avg_var_name)])
+        }
+
+    def merge_combiner(x, y):
+        return {
+            'sequence': x['sequence'],
+            'time': x['time'],
+            'iso_time': x['iso_time'],
+            avg_var_name_collection: (x[avg_var_name_collection] + y[avg_var_name_collection])
+        }
+
     # Convert the tuples to dictionary entries and combine coordinates
     # with the same time stamp.  Here we have input key = (time)
-    results = results.values(). \
-        combineByKey(lambda val, avg_var_name=avg_var_name,
-                            avg_var_name_collection=avg_var_name_collection: {
-        'sequence': val[1],
-        'time': val[0],
-        'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
-        avg_var_name_collection: [
-            hof_tuple_to_dict(val, avg_var_name)]},
-                     lambda x, val, avg_var_name=avg_var_name,
-                            avg_var_name_collection=avg_var_name_collection: {
-                         'sequence': x['sequence'],
-                         'time': x['time'],
-                         'iso_time': x['iso_time'],
-                         avg_var_name_collection: (
-                                 x[avg_var_name_collection] +
-                                 [hof_tuple_to_dict(val, avg_var_name)])},
-                     lambda x, y,
-                            avg_var_name_collection=avg_var_name_collection:
-                     {'sequence': x['sequence'],
-                      'time': x['time'],
-                      'iso_time': x['iso_time'],
-                      avg_var_name_collection: (
-                              x[avg_var_name_collection] +
-                              y[avg_var_name_collection])}). \
-        values(). \
-        collect()
+    results = results.values().combineByKey(create_combiner, merge_value, merge_combiner).values().collect()
 
     reduce_duration = (datetime.now() - reduce_start).total_seconds()
     metrics_callback(reduce=reduce_duration)
@@ -339,7 +345,7 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
         BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
 
     def calc(self, compute_options, **args):
-        ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+        ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
 
         metrics_record = self._create_metrics_record()
         calculation_start = datetime.now()
@@ -360,7 +366,8 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
                                self._latlon,
                                self._tile_service_factory,
                                nexus_tiles_spark,
-                               metrics_record.record_metrics)
+                               metrics_record.record_metrics,
+                               normalize_dates)
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry['time'])
         for i in range(len(results)):
@@ -394,7 +401,7 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
         BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
 
     def calc(self, compute_options, **args):
-        ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+        ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
 
         metrics_record = self._create_metrics_record()
         calculation_start = datetime.now()
@@ -415,7 +422,8 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
                                self._latlon,
                                self._tile_service_factory,
                                nexus_tiles_spark,
-                               metrics_record.record_metrics)
+                               metrics_record.record_metrics,
+                               normalize_dates)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index cddbba8..1eeecce 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,14 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import calendar
 import itertools
 import logging
 import traceback
 from cStringIO import StringIO
 from datetime import datetime
 from functools import partial
-import time
 
 import matplotlib.dates as mdates
 import matplotlib.pyplot as plt
@@ -29,12 +27,12 @@ import pytz
 import shapely.geometry
 import shapely.wkt
 from backports.functools_lru_cache import lru_cache
-from nexustiles.nexustiles import NexusTileService
 from pytz import timezone
 from scipy import stats
 from webservice import Filtering as filtering
 from webservice.NexusHandler import nexus_handler
 from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
 from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
 
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -466,12 +464,6 @@ def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory,
metric
     return results, {}
 
 
-def normalize_date(time_in_seconds):
-    dt = datetime.utcfromtimestamp(time_in_seconds)
-    normalized_dt = dt.replace(day=1)
-    return int(time.mktime(normalized_dt.timetuple()))
-
-
 def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark):
     import shapely.wkt
     from datetime import datetime
@@ -532,7 +524,7 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
 
         # Return Stats by day
         if normalize_dates:
-            timeinseconds = normalize_date(timeinseconds)
+            timeinseconds = utils.normalize_date(timeinseconds)
 
         stat = {
             'min': data_min,
diff --git a/analysis/webservice/algorithms_spark/utils.py b/analysis/webservice/algorithms_spark/utils.py
new file mode 100644
index 0000000..5556b15
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/utils.py
@@ -0,0 +1,9 @@
+from datetime import datetime
+import pytz
+import calendar
+
+
+def normalize_date(time_in_seconds):
+    dt = datetime.utcfromtimestamp(time_in_seconds)
+    normalized_dt = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.utc)
+    return int(calendar.timegm(normalized_dt.timetuple()))


Mime
View raw message