sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eamonf...@apache.org
Subject [incubator-sdap-ingester] 06/06: wip
Date Wed, 21 Oct 2020 23:57:35 GMT
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 2eb7aafa7130366dd03889a7655c169c48ff6a94
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Wed Oct 21 16:41:59 2020 -0700

    wip
---
 .../collection_manager/entities/Collection.py      | 12 ++++++++
 .../collection_manager/entities/__init__.py        |  1 +
 collection_manager/collection_manager/main.py      |  3 +-
 .../services/CollectionWatcher.py                  | 33 ++++++++++++----------
 .../collection_manager/services/S3Observer.py      |  2 ++
 collection_manager/requirements.txt                |  3 +-
 6 files changed, 37 insertions(+), 17 deletions(-)

diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index aa700cd..7a45b66 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -5,10 +5,16 @@ from datetime import datetime
 from fnmatch import fnmatch
 from glob import glob
 from typing import List, Optional
+from enum import Enum
 
 from collection_manager.entities.exceptions import MissingValueCollectionError
 
 
+class CollectionStorageType(Enum):
+    LOCAL = 1
+    S3 = 2
+
+
 @dataclass(frozen=True)
 class Collection:
     dataset_id: str
@@ -40,6 +46,12 @@ class Collection:
         except KeyError as e:
             raise MissingValueCollectionError(missing_value=e.args[0])
 
+    def storage_type(self):
+        if urlparse(self.path).scheme == 's3':
+            return CollectionStorageType.S3
+        else:
+            return CollectionStorageType.LOCAL
+
     def directory(self):
         if urlparse(self.path).scheme == 's3':
             return self.path
diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py
index 165341b..b9c7a25 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1 +1,2 @@
 from .Collection import Collection
+from .Collection import CollectionStorageType
\ No newline at end of file
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3de4fdd..044cb87 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -71,7 +71,8 @@ async def main():
                                                        history_manager_builder=history_manager_builder)
             collection_watcher = CollectionWatcher(collections_path=options.collections_path,
                                                    granule_updated_callback=collection_processor.process_granule,
-                                                   collections_refresh_interval=int(options.refresh))
+                                                   collections_refresh_interval=int(options.refresh),
+                                                   s3=True)
 
             await collection_watcher.start_watching()
             while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 215e80e..87b1ac3 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
 import asyncio
+from collection_manager.entities.Collection import CollectionStorageType
 from collection_manager.services.S3Observer import S3Observer
 import logging
 import os
@@ -69,11 +70,15 @@ class CollectionWatcher:
         return {collection for collections in self._collections_by_dir.values() for collection
in collections}
 
     def _validate_collection(self, collection: Collection):
-        directory = collection.directory()
-        if not os.path.isabs(directory):
-            raise RelativePathCollectionError(collection=collection)
-        if directory == os.path.dirname(self._collections_path):
-            raise ConflictingPathCollectionError(collection=collection)
+        if collection.storage_type() == CollectionStorageType.S3:
+            # do some S3 path validation here
+            return
+        else:
+            directory = collection.directory()
+            if not os.path.isabs(directory):
+                raise RelativePathCollectionError(collection=collection)
+            if directory == os.path.dirname(self._collections_path):
+                raise ConflictingPathCollectionError(collection=collection)
 
     def _load_collections(self):
         try:
@@ -185,18 +190,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_created(self, event):
         super().on_created(event)
-        for collection in self._collections_for_dir:
-            try:
-                if collection.owns_file(event.src_path):
-                    self._loop.create_task(self._callback(event.src_path, collection))
-            except IsADirectoryError:
-                pass
+        self._handle_event(event)
 
     def on_modified(self, event):
         super().on_modified(event)
-        # if os.path.isdir(event.src_path):
-        #     return
-        if type(event) == FileModifiedEvent:
-            for collection in self._collections_for_dir:
+        self._handle_event(event)
+
+    def _handle_event(self, event):
+        for collection in self._collections_for_dir:
+            try:
                 if collection.owns_file(event.src_path):
                     self._loop.create_task(self._callback(event.src_path, collection))
+            except IsADirectoryError:
+                return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 7720432..376a907 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -103,6 +103,8 @@ class S3Observer:
         start = time.perf_counter()
         async with aioboto3.resource("s3") as s3:
             bucket = await s3.Bucket(self._bucket)
+
+            # we need the key without the bucket name
             async for file in bucket.objects.filter(Prefix=path):
                 new_cache[file.key] = await file.last_modified
         end = time.perf_counter()
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 34f1334..3402a73 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -5,4 +5,5 @@ watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
 tenacity==6.2.0
-aioboto3==8.0.5
\ No newline at end of file
+aioboto3==8.0.5
+aiohttp==3.6.2
\ No newline at end of file


Mime
View raw message