sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eamonf...@apache.org
Subject [incubator-sdap-ingester] branch s3-support updated: it works
Date Thu, 29 Oct 2020 23:51:12 GMT
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/s3-support by this push:
     new 4677af8  it works
4677af8 is described below

commit 4677af8ab43d14ec75522cac625a736c49409f90
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Thu Oct 29 16:51:02 2020 -0700

    it works
---
 collection_manager/collection_manager/main.py      |  2 +-
 .../services/CollectionProcessor.py                |  7 ++++--
 .../services/CollectionWatcher.py                  |  6 +++---
 .../services/history_manager/IngestionHistory.py   | 25 +++++++++-------------
 .../history_manager/SolrIngestionHistory.py        |  2 +-
 5 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3dba6e0..782c70e 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -61,7 +61,7 @@ def get_args() -> argparse.Namespace:
 async def main():
     try:
         options = get_args()
-        ENABLE_S3 = False
+        ENABLE_S3 = True
 
         if ENABLE_S3:
             signature_fun = None
diff --git a/collection_manager/collection_manager/services/CollectionProcessor.py b/collection_manager/collection_manager/services/CollectionProcessor.py
index 86c7691..66c1f38 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -24,7 +24,7 @@ class CollectionProcessor:
         self._history_manager_builder = history_manager_builder
         self._history_manager_cache: Dict[str, IngestionHistory] = {}
 
-    async def process_granule(self, granule: str, modified_time: datetime, collection: Collection):
+    async def process_granule(self, granule: str, modified_time: int, collection: Collection):
         """
         Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message
for it.
         :param granule: A path to a granule file
@@ -35,7 +35,10 @@ class CollectionProcessor:
             return
 
         history_manager = self._get_history_manager(collection.dataset_id)
-        granule_status = await history_manager.get_granule_status(granule, modified_time,
collection.date_from, collection.date_to)
+        granule_status = await history_manager.get_granule_status(granule,
+                                                                  modified_time,
+                                                                  collection.date_from,
+                                                                  collection.date_to)
 
         if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING:
             logger.info(f"New granule '{granule}' detected for forward-processing ingestion
"
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 1fb6abd..abd4a11 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -121,7 +121,7 @@ class CollectionWatcher:
         start = time.perf_counter()
         for collection in collections:
             for granule_path in glob(collection.path, recursive=True):
-                modified_time = os.path.getmtime(granule_path)
+                modified_time = int(os.path.getmtime(granule_path))
                 await self._granule_updated_callback(granule_path, modified_time, collection)
         logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
 
@@ -207,9 +207,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
             try:
                 if collection.owns_file(path):
                     if isinstance(event, S3Event):
-                        modified_time = event.modified_time
+                        modified_time = int(event.modified_time.timestamp())
                     else:
-                        modified_time = datetime.fromtimestamp(os.path.getmtime(path))
+                        modified_time = int(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return
diff --git a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
index d901690..7f33c79 100644
--- a/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/IngestionHistory.py
@@ -38,17 +38,16 @@ class GranuleStatus(Enum):
 
 class IngestionHistory(ABC):
     _signature_fun = None
-    _latest_ingested_file_update: float = None
+    _latest_ingested_file_update: int = None
 
-    async def push(self, file_path: str, modified_datetime: datetime):
+    async def push(self, file_path: str, modified_timestamp: int):
         """
         Record a file as having been ingested.
         :param file_path: The full path to the file to record.
         :return: None
         """
-        modified_timestamp = int(modified_datetime.timestamp())
         file_name = IngestionHistory._get_standardized_path(file_path)
-        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_timestamp)
+        signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp)
         await self._push_record(file_name, signature)
 
         if not self._latest_ingested_file_update:
@@ -60,7 +59,7 @@ class IngestionHistory(ABC):
 
     async def get_granule_status(self,
                                  file_path: str,
-                                 modified_datetime: datetime,
+                                 modified_timestamp: int,
                                  date_from: datetime = None,
                                  date_to: datetime = None) -> GranuleStatus:
         """
@@ -77,11 +76,11 @@ class IngestionHistory(ABC):
                         should fall in order to be "desired".
         :return: A GranuleStatus enum.
         """
-        signature = self._signature_fun(file_path) if self._signature_fun else self._signature_from_timestamp(modified_datetime.timestamp())
+        signature = self._signature_fun(file_path) if self._signature_fun else str(modified_timestamp)
 
-        if self._in_time_range(modified_datetime, start_date=self._latest_ingested_mtime()):
+        if self._in_time_range(modified_timestamp, start_date=self._latest_ingested_mtime()):
             return GranuleStatus.DESIRED_FORWARD_PROCESSING
-        elif self._in_time_range(modified_datetime, date_from, date_to) and not await self._already_ingested(file_path,
signature):
+        elif self._in_time_range(modified_timestamp, date_from, date_to) and not await self._already_ingested(file_path,
signature):
             return GranuleStatus.DESIRED_HISTORICAL
         else:
             return GranuleStatus.UNDESIRED
@@ -127,18 +126,14 @@ class IngestionHistory(ABC):
         pass
 
     @staticmethod
-    def _in_time_range(date: datetime, start_date: datetime = None, end_date: datetime =
None):
+    def _in_time_range(timestamp: int, start_date: datetime = None, end_date: datetime =
None):
         """
         :param file: file path as a string
         :param date_from: timestamp, can be None
         :param date_to: timestamp, can be None
         :return: True is the update time of the file is between ts_from and ts_to. False
otherwise
         """
-        is_after_from = start_date.timestamp() < date.timestamp() if start_date else True
-        is_before_to = end_date.timestamp() > date.timestamp() if end_date else True
+        is_after_from = int(start_date.timestamp()) < timestamp if start_date else True
+        is_before_to = int(end_date.timestamp()) > timestamp if end_date else True
 
         return is_after_from and is_before_to
-
-    @staticmethod
-    def _signature_from_timestamp(timestamp: float):
-        return str(int(timestamp))
diff --git a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
index ebed073..c6d26a5 100644
--- a/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
+++ b/collection_manager/collection_manager/services/history_manager/SolrIngestionHistory.py
@@ -64,7 +64,7 @@ class SolrIngestionHistory(IngestionHistory):
             self._solr_datasets.add([{
                 'id': self._dataset_id,
                 'dataset_s': self._dataset_id,
-                'latest_update_l': int(self._latest_ingested_file_update)}])
+                'latest_update_l': self._latest_ingested_file_update}])
             self._solr_datasets.commit()
 
     def _get_latest_file_update(self):


Mime
View raw message