This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch date-normalization
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/date-normalization by this push:
new 38666fb Add normalizeDates option to Hofmueller
38666fb is described below
commit 38666fbee8e8cdc114cdb38fdf11ed0994d1ecd0
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Wed Dec 23 16:08:38 2020 -0800
Add normalizeDates option to Hofmueller
---
.../webservice/algorithms_spark/HofMoellerSpark.py | 74 ++++++++++++----------
.../webservice/algorithms_spark/TimeSeriesSpark.py | 12 +---
analysis/webservice/algorithms_spark/utils.py | 9 +++
3 files changed, 52 insertions(+), 43 deletions(-)
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 7c3041a..f65b25b 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -28,6 +28,7 @@ from pytz import timezone
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -159,7 +160,7 @@ class BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
except:
try:
west, south, east, north = request.get_min_lon(), request.get_min_lat(),
\
- request.get_max_lon(), request.get_max_lat()
+ request.get_max_lon(), request.get_max_lat()
bounding_polygon = shapely.geometry.Polygon(
[(west, south), (east, south), (east, north), (west, north), (west, south)])
except:
@@ -192,8 +193,9 @@ class BaseHoffMoellerSparkHandlerImpl(NexusCalcSparkHandler):
start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+ normalize_dates = request.get_normalize_dates()
- return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch
+ return ds, bounding_polygon, start_seconds_from_epoch, end_seconds_from_epoch, normalize_dates
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="mean", append=True):
shape = (len(results), len(results[0][pivot]))
@@ -262,7 +264,7 @@ def hof_tuple_to_dict(t, avg_var_name):
'min': t[7]}
-def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback):
+def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback, normalize_dates):
# Parallelize list of tile ids
rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
if latlon == 0:
@@ -291,34 +293,38 @@ def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark,
metrics_ca
hof_tuple_combine(x[1],
y[1])))
+ # The functions create_combiner, merge_value, and merge_combiner are arguments to RDD.combineByKey()
+ def create_combiner(val):
+ time_in_seconds = val[0]
+ if normalize_dates:
+ time_in_seconds = utils.normalize_date(val[0])
+
+ return {
+ 'sequence': val[1],
+ 'time': time_in_seconds,
+ 'iso_time': datetime.utcfromtimestamp(time_in_seconds).strftime(ISO_8601),
+ avg_var_name_collection: [hof_tuple_to_dict(val, avg_var_name)]
+ }
+
+ def merge_value(x, val):
+ return {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (x[avg_var_name_collection] + [hof_tuple_to_dict(val,
avg_var_name)])
+ }
+
+ def merge_combiner(x, y):
+ return {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ 'iso_time': x['iso_time'],
+ avg_var_name_collection: (x[avg_var_name_collection] + y[avg_var_name_collection])
+ }
+
# Convert the tuples to dictionary entries and combine coordinates
# with the same time stamp. Here we have input key = (time)
- results = results.values(). \
- combineByKey(lambda val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': val[1],
- 'time': val[0],
- 'iso_time': datetime.utcfromtimestamp(val[0]).strftime(ISO_8601),
- avg_var_name_collection: [
- hof_tuple_to_dict(val, avg_var_name)]},
- lambda x, val, avg_var_name=avg_var_name,
- avg_var_name_collection=avg_var_name_collection: {
- 'sequence': x['sequence'],
- 'time': x['time'],
- 'iso_time': x['iso_time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- [hof_tuple_to_dict(val, avg_var_name)])},
- lambda x, y,
- avg_var_name_collection=avg_var_name_collection:
- {'sequence': x['sequence'],
- 'time': x['time'],
- 'iso_time': x['iso_time'],
- avg_var_name_collection: (
- x[avg_var_name_collection] +
- y[avg_var_name_collection])}). \
- values(). \
- collect()
+ results = results.values().combineByKey(create_combiner, merge_value, merge_combiner).values().collect()
reduce_duration = (datetime.now() - reduce_start).total_seconds()
metrics_callback(reduce=reduce_duration)
@@ -339,7 +345,7 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -360,7 +366,8 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
- metrics_record.record_metrics)
+ metrics_record.record_metrics,
+ normalize_dates)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry['time'])
for i in range(len(results)):
@@ -394,7 +401,7 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
BaseHoffMoellerSparkHandlerImpl.__init__(self, **kwargs)
def calc(self, compute_options, **args):
- ds, bbox, start_time, end_time = self.parse_arguments(compute_options)
+ ds, bbox, start_time, end_time, normalize_dates = self.parse_arguments(compute_options)
metrics_record = self._create_metrics_record()
calculation_start = datetime.now()
@@ -415,7 +422,8 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl):
self._latlon,
self._tile_service_factory,
nexus_tiles_spark,
- metrics_record.record_metrics)
+ metrics_record.record_metrics,
+ normalize_dates)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index cddbba8..1eeecce 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -13,14 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import calendar
import itertools
import logging
import traceback
from cStringIO import StringIO
from datetime import datetime
from functools import partial
-import time
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
@@ -29,12 +27,12 @@ import pytz
import shapely.geometry
import shapely.wkt
from backports.functools_lru_cache import lru_cache
-from nexustiles.nexustiles import NexusTileService
from pytz import timezone
from scipy import stats
from webservice import Filtering as filtering
from webservice.NexusHandler import nexus_handler
from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms_spark import utils
from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -466,12 +464,6 @@ def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory,
metric
return results, {}
-def normalize_date(time_in_seconds):
- dt = datetime.utcfromtimestamp(time_in_seconds)
- normalized_dt = dt.replace(day=1)
- return int(time.mktime(normalized_dt.timetuple()))
-
-
def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates, tile_in_spark):
import shapely.wkt
from datetime import datetime
@@ -532,7 +524,7 @@ def calc_average_on_day(tile_service_factory, metrics_callback, normalize_dates,
# Return Stats by day
if normalize_dates:
- timeinseconds = normalize_date(timeinseconds)
+ timeinseconds = utils.normalize_date(timeinseconds)
stat = {
'min': data_min,
diff --git a/analysis/webservice/algorithms_spark/utils.py b/analysis/webservice/algorithms_spark/utils.py
new file mode 100644
index 0000000..5556b15
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/utils.py
@@ -0,0 +1,9 @@
+from datetime import datetime
+import pytz
+import calendar
+
+
+def normalize_date(time_in_seconds):
+ dt = datetime.utcfromtimestamp(time_in_seconds)
+ normalized_dt = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.utc)
+ return int(calendar.timegm(normalized_dt.timetuple()))
|