This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit c56cd915a342f57d49912f63555fd93a15093099
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Wed Oct 21 16:41:59 2020 -0700
wip
---
.../collection_manager/entities/Collection.py | 12 ++++++++
.../collection_manager/entities/__init__.py | 1 +
collection_manager/collection_manager/main.py | 3 +-
.../services/CollectionWatcher.py | 33 ++++++++++++----------
.../collection_manager/services/S3Observer.py | 2 ++
collection_manager/requirements.txt | 3 +-
6 files changed, 37 insertions(+), 17 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py b/collection_manager/collection_manager/entities/Collection.py
index aa700cd..7a45b66 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -5,10 +5,16 @@ from datetime import datetime
from fnmatch import fnmatch
from glob import glob
from typing import List, Optional
+from enum import Enum
from collection_manager.entities.exceptions import MissingValueCollectionError
+class CollectionStorageType(Enum):
+ LOCAL = 1
+ S3 = 2
+
+
@dataclass(frozen=True)
class Collection:
dataset_id: str
@@ -40,6 +46,12 @@ class Collection:
except KeyError as e:
raise MissingValueCollectionError(missing_value=e.args[0])
+ def storage_type(self):
+ if urlparse(self.path).scheme == 's3':
+ return CollectionStorageType.S3
+ else:
+ return CollectionStorageType.LOCAL
+
def directory(self):
if urlparse(self.path).scheme == 's3':
return self.path
diff --git a/collection_manager/collection_manager/entities/__init__.py b/collection_manager/collection_manager/entities/__init__.py
index 165341b..b9c7a25 100644
--- a/collection_manager/collection_manager/entities/__init__.py
+++ b/collection_manager/collection_manager/entities/__init__.py
@@ -1 +1,2 @@
from .Collection import Collection
+from .Collection import CollectionStorageType
\ No newline at end of file
diff --git a/collection_manager/collection_manager/main.py b/collection_manager/collection_manager/main.py
index 3de4fdd..044cb87 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -71,7 +71,8 @@ async def main():
history_manager_builder=history_manager_builder)
collection_watcher = CollectionWatcher(collections_path=options.collections_path,
granule_updated_callback=collection_processor.process_granule,
- collections_refresh_interval=int(options.refresh))
+ collections_refresh_interval=int(options.refresh),
+ s3=True)
await collection_watcher.start_watching()
while True:
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 215e80e..87b1ac3 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -1,4 +1,5 @@
import asyncio
+from collection_manager.entities.Collection import CollectionStorageType
from collection_manager.services.S3Observer import S3Observer
import logging
import os
@@ -69,11 +70,15 @@ class CollectionWatcher:
return {collection for collections in self._collections_by_dir.values() for collection
in collections}
def _validate_collection(self, collection: Collection):
- directory = collection.directory()
- if not os.path.isabs(directory):
- raise RelativePathCollectionError(collection=collection)
- if directory == os.path.dirname(self._collections_path):
- raise ConflictingPathCollectionError(collection=collection)
+ if collection.storage_type() == CollectionStorageType.S3:
+ # do some S3 path validation here
+ return
+ else:
+ directory = collection.directory()
+ if not os.path.isabs(directory):
+ raise RelativePathCollectionError(collection=collection)
+ if directory == os.path.dirname(self._collections_path):
+ raise ConflictingPathCollectionError(collection=collection)
def _load_collections(self):
try:
@@ -185,18 +190,16 @@ class _GranuleEventHandler(FileSystemEventHandler):
def on_created(self, event):
super().on_created(event)
- for collection in self._collections_for_dir:
- try:
- if collection.owns_file(event.src_path):
- self._loop.create_task(self._callback(event.src_path, collection))
- except IsADirectoryError:
- pass
+ self._handle_event(event)
def on_modified(self, event):
super().on_modified(event)
- # if os.path.isdir(event.src_path):
- # return
- if type(event) == FileModifiedEvent:
- for collection in self._collections_for_dir:
+ self._handle_event(event)
+
+ def _handle_event(self, event):
+ for collection in self._collections_for_dir:
+ try:
if collection.owns_file(event.src_path):
self._loop.create_task(self._callback(event.src_path, collection))
+ except IsADirectoryError:
+ return
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
index 7720432..376a907 100644
--- a/collection_manager/collection_manager/services/S3Observer.py
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -103,6 +103,8 @@ class S3Observer:
start = time.perf_counter()
async with aioboto3.resource("s3") as s3:
bucket = await s3.Bucket(self._bucket)
+
+ # we need the key without the bucket name
async for file in bucket.objects.filter(Prefix=path):
new_cache[file.key] = await file.last_modified
end = time.perf_counter()
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index 34f1334..3402a73 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -5,4 +5,5 @@ watchdog==0.10.2
requests==2.23.0
aio-pika==6.6.1
tenacity==6.2.0
-aioboto3==8.0.5
\ No newline at end of file
+aioboto3==8.0.5
+aiohttp==3.6.2
\ No newline at end of file
|