This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit e26891a5f7b9111909eb167f38d1f1a1ad48a0ed
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Tue Oct 6 13:15:23 2020 -0700
Create S3Observer
---
.../services/CollectionWatcher.py | 14 +--
.../collection_manager/services/S3Observer.py | 140 +++++++++++++++++++++
.../collection_manager/services/__init__.py | 1 +
collection_manager/requirements.txt | 3 +-
4 files changed, 150 insertions(+), 8 deletions(-)
diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 20ec7c7..499a17d 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -12,7 +12,7 @@ from collection_manager.entities.exceptions import (
CollectionConfigFileNotFoundError, CollectionConfigParsingError,
ConflictingPathCollectionError, MissingValueCollectionError,
RelativePathCollectionError, RelativePathError)
-from watchdog.events import FileSystemEventHandler
+from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
from watchdog.observers.polling import PollingObserver as Observer
logger = logging.getLogger(__name__)
@@ -179,9 +179,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
def on_modified(self, event):
super().on_modified(event)
- if os.path.isdir(event.src_path):
- return
-
- for collection in self._collections_for_dir:
- if collection.owns_file(event.src_path):
- self._loop.create_task(self._callback(event.src_path, collection))
+ # if os.path.isdir(event.src_path):
+ # return
+ if type(event) == FileModifiedEvent:
+ for collection in self._collections_for_dir:
+ if collection.owns_file(event.src_path):
+ self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
new file mode 100644
index 0000000..9a86d1e
--- /dev/null
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -0,0 +1,140 @@
+import asyncio
+import datetime
+import os
+import time
+from dataclasses import dataclass
+from typing import Set, Dict, Optional, Callable, Awaitable
+
+import aioboto3
+
+os.environ['AWS_PROFILE'] = "saml-pub"
+os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
+
+
+@dataclass
+class S3Event:
+ src_path: str
+
+
+class S3FileModifiedEvent(S3Event):
+ pass
+
+
+class S3FileCreatedEvent(S3Event):
+ pass
+
+
+class S3Watch(object):
+ def __init__(self, path: str, event_handler) -> None:
+ self.path = path
+ self.event_handler = event_handler
+
+
+class S3Observer:
+
+ def __init__(self, bucket, initial_scan=False) -> None:
+ self._bucket = bucket
+ self._cache: Dict[str, datetime.datetime] = {}
+ self._initial_scan = initial_scan
+ self._watches: Set[S3Watch] = set()
+
+ self._has_polled = False
+
+ async def start(self):
+ await self._run_periodically(loop=None,
+ wait_time=30,
+ func=self._poll)
+
+ def unschedule(self, watch: S3Watch):
+ self._watches.remove(watch)
+
+ def schedule(self, path: str, event_handler):
+ watch = S3Watch(path=path, event_handler=event_handler)
+ self._watches.add(watch)
+ return watch
+
+ @classmethod
+ async def _run_periodically(cls,
+ loop: Optional[asyncio.AbstractEventLoop],
+ wait_time: float,
+ func: Callable[[any], Awaitable],
+ *args,
+ **kwargs):
+ """
+ Call a function periodically. This uses asyncio, and is non-blocking.
+ :param loop: An optional event loop to use. If None, the current running event loop
will be used.
+ :param wait_time: seconds to wait between iterations of func
+ :param func: the async function that will be awaited
+ :param args: any args that need to be provided to func
+ """
+ if loop is None:
+ loop = asyncio.get_running_loop()
+ await func(*args, **kwargs)
+ loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time,
func, *args, **kwargs))
+
+ async def _poll(self):
+ new_cache = {}
+ watch_index = {}
+
+ for watch in self._watches:
+ new_cache_for_watch = await self._get_s3_files(watch.path)
+ new_index = {file: watch for file in new_cache_for_watch}
+
+ new_cache = {**new_cache, **new_cache_for_watch}
+ watch_index = {**watch_index, **new_index}
+ difference = set(new_cache.items()) - set(self._cache.items())
+
+ if self._has_polled or self._initial_scan:
+ for (file, modified_date) in difference:
+ watch = watch_index[file]
+ file_is_new = file not in self._cache
+
+ if file_is_new:
+ watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+ else:
+ watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+
+ self._cache = new_cache
+ self._has_polled = True
+
+ async def _get_s3_files(self, path: str):
+ new_cache = {}
+
+ start = time.perf_counter()
+ async with aioboto3.resource("s3") as s3:
+ bucket = await s3.Bucket(self._bucket)
+ async for file in bucket.objects.filter(Prefix=path):
+ new_cache[file.key] = await file.last_modified
+ end = time.perf_counter()
+ duration = end - start
+
+ print(f"Retrieved {len(new_cache)} objects in {duration}")
+
+ return new_cache
+
+
+async def test():
+ observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
+ handler = Handler()
+ observer.schedule('avhrr/2012', handler)
+ observer.schedule('avhrr/2013', handler)
+
+ await observer.start()
+
+ while True:
+ try:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ return
+
+
+class Handler:
+ def on_created(self, event: S3Event):
+ print(f"File created: {event.src_path}")
+
+ def on_modified(self, event: S3Event):
+ print(f"File modified: {event.src_path}")
+
+
+if __name__ == "__main__":
+ asyncio.run(test())
diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py
index 635d3dc..553e1b7 100644
--- a/collection_manager/collection_manager/services/__init__.py
+++ b/collection_manager/collection_manager/services/__init__.py
@@ -16,3 +16,4 @@
from .CollectionProcessor import CollectionProcessor
from .CollectionWatcher import CollectionWatcher
from .MessagePublisher import MessagePublisher
+from .S3Observer import S3Observer
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index ee12c89..34f1334 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,4 +4,5 @@ pysolr==3.9.0
watchdog==0.10.2
requests==2.23.0
aio-pika==6.6.1
-tenacity==6.2.0
\ No newline at end of file
+tenacity==6.2.0
+aioboto3==8.0.5
\ No newline at end of file
|