sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eamonf...@apache.org
Subject [incubator-sdap-ingester] branch s3-support updated: fixed scanning weirdness
Date Thu, 29 Oct 2020 23:33:09 GMT
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/s3-support by this push:
     new 100a9bb  fixed scanning weirdness
100a9bb is described below

commit 100a9bbebe4e4b7c21c81c7a53f289bedf63d701
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Thu Oct 29 16:32:57 2020 -0700

    fixed scanning weirdness
---
 .../services/CollectionWatcher.py                  | 32 ++++++++++++----------
 1 file changed, 18 insertions(+), 14 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index f885b1c..1fb6abd 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,16 +1,15 @@
 import asyncio
 from datetime import datetime
-from collection_manager.entities.Collection import CollectionStorageType
+from collection_manager.entities.Collection import CollectionStorageType, Collection
 from collection_manager.services.S3Observer import S3Event, S3Observer
 import logging
 import os
 import time
 from collections import defaultdict
 from glob import glob
-from typing import Awaitable, Callable, Dict, Optional, Set
+from typing import Awaitable, Callable, Dict, List, Optional, Set
 
 import yaml
-from collection_manager.entities import Collection
 from collection_manager.entities.exceptions import (CollectionConfigFileNotFoundError,
                                                     CollectionConfigParsingError,
                                                     ConflictingPathCollectionError,
@@ -58,7 +57,7 @@ class CollectionWatcher:
                                      wait_time=self._collections_refresh_interval,
                                      func=self._reload_and_reschedule)
 
-        if type(self._observer) == S3Observer:
+        if isinstance(self._observer, S3Observer):
             await self._observer.start()
         else:
             self._observer.start()
@@ -117,18 +116,23 @@ class CollectionWatcher:
         self._load_collections()
         return self._collections() - old_collections
 
+    async def _call_callback_for_all_granules(self, collections: List[Collection]):
+        logger.info(f"Scanning files for {len(collections)} collections...")
+        start = time.perf_counter()
+        for collection in collections:
+            for granule_path in glob(collection.path, recursive=True):
+                modified_time = os.path.getmtime(granule_path)
+                await self._granule_updated_callback(granule_path, modified_time, collection)
+        logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+
     async def _reload_and_reschedule(self):
         try:
             updated_collections = self._get_updated_collections()
             if len(updated_collections) > 0:
-                logger.info(f"Scanning files for {len(updated_collections)} collections...")
-                start = time.perf_counter()
-                for collection in updated_collections:
-                    files_owned = glob(collection.path, recursive=True)
-                    for granule in files_owned:
-                        await self._granule_updated_callback(granule, collection)
-
-                logger.info(f"Finished scanning files in {time.perf_counter() - start} seconds.")
+                # For S3 collections, the S3Observer will report as new any files that haven't
already been scanned.
+                # So we only need to rescan granules here if not using S3.
+                if not isinstance(self._observer, S3Observer):
+                    await self._call_callback_for_all_granules(collections=updated_collections)
 
                 self._unschedule_watches()
                 self._schedule_watches()
@@ -148,7 +152,7 @@ class CollectionWatcher:
             # Note: the Watchdog library does not schedule a new watch
             # if one is already scheduled for the same directory
             try:
-                if type(self._observer) == S3Observer:
+                if isinstance(self._observer, S3Observer):
                     self._granule_watches.add(self._observer.schedule(granule_event_handler,
directory))
                 else:
                     self._granule_watches.add(self._observer.schedule(granule_event_handler,
directory, recursive=True))
@@ -205,7 +209,7 @@ class _GranuleEventHandler(FileSystemEventHandler):
                     if isinstance(event, S3Event):
                         modified_time = event.modified_time
                     else:
-                        modified_time = os.path.getmtime(path)
+                        modified_time = datetime.fromtimestamp(os.path.getmtime(path))
                     self._loop.create_task(self._callback(path, modified_time, collection))
             except IsADirectoryError:
                 return


Mime
View raw message