sdap-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eamonf...@apache.org
Subject [incubator-sdap-ingester] 03/06: Create S3Observer
Date Wed, 21 Oct 2020 23:42:46 GMT
This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch s3-support
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit e26891a5f7b9111909eb167f38d1f1a1ad48a0ed
Author: Eamon Ford <eamonford@gmail.com>
AuthorDate: Tue Oct 6 13:15:23 2020 -0700

    Create S3Observer
---
 .../services/CollectionWatcher.py                  |  14 +--
 .../collection_manager/services/S3Observer.py      | 140 +++++++++++++++++++++
 .../collection_manager/services/__init__.py        |   1 +
 collection_manager/requirements.txt                |   3 +-
 4 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py
index 20ec7c7..499a17d 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -12,7 +12,7 @@ from collection_manager.entities.exceptions import (
     CollectionConfigFileNotFoundError, CollectionConfigParsingError,
     ConflictingPathCollectionError, MissingValueCollectionError,
     RelativePathCollectionError, RelativePathError)
-from watchdog.events import FileSystemEventHandler
+from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
 from watchdog.observers.polling import PollingObserver as Observer
 
 logger = logging.getLogger(__name__)
@@ -179,9 +179,9 @@ class _GranuleEventHandler(FileSystemEventHandler):
 
     def on_modified(self, event):
         super().on_modified(event)
-        if os.path.isdir(event.src_path):
-            return
-
-        for collection in self._collections_for_dir:
-            if collection.owns_file(event.src_path):
-                self._loop.create_task(self._callback(event.src_path, collection))
+        # if os.path.isdir(event.src_path):
+        #     return
+        if type(event) == FileModifiedEvent:
+            for collection in self._collections_for_dir:
+                if collection.owns_file(event.src_path):
+                    self._loop.create_task(self._callback(event.src_path, collection))
diff --git a/collection_manager/collection_manager/services/S3Observer.py b/collection_manager/collection_manager/services/S3Observer.py
new file mode 100644
index 0000000..9a86d1e
--- /dev/null
+++ b/collection_manager/collection_manager/services/S3Observer.py
@@ -0,0 +1,140 @@
+import asyncio
+import datetime
+import os
+import time
+from dataclasses import dataclass
+from typing import Set, Dict, Optional, Callable, Awaitable
+
+import aioboto3
+
+os.environ['AWS_PROFILE'] = "saml-pub"
+os.environ['AWS_DEFAULT_REGION'] = "us-west-2"
+
+
+@dataclass
+class S3Event:
+    src_path: str
+
+
+class S3FileModifiedEvent(S3Event):
+    pass
+
+
+class S3FileCreatedEvent(S3Event):
+    pass
+
+
+class S3Watch(object):
+    def __init__(self, path: str, event_handler) -> None:
+        self.path = path
+        self.event_handler = event_handler
+
+
+class S3Observer:
+
+    def __init__(self, bucket, initial_scan=False) -> None:
+        self._bucket = bucket
+        self._cache: Dict[str, datetime.datetime] = {}
+        self._initial_scan = initial_scan
+        self._watches: Set[S3Watch] = set()
+
+        self._has_polled = False
+
+    async def start(self):
+        await self._run_periodically(loop=None,
+                                     wait_time=30,
+                                     func=self._poll)
+
+    def unschedule(self, watch: S3Watch):
+        self._watches.remove(watch)
+
+    def schedule(self, path: str, event_handler):
+        watch = S3Watch(path=path, event_handler=event_handler)
+        self._watches.add(watch)
+        return watch
+
+    @classmethod
+    async def _run_periodically(cls,
+                                loop: Optional[asyncio.AbstractEventLoop],
+                                wait_time: float,
+                                func: Callable[[any], Awaitable],
+                                *args,
+                                **kwargs):
+        """
+        Call a function periodically. This uses asyncio, and is non-blocking.
+        :param loop: An optional event loop to use. If None, the current running event loop
will be used.
+        :param wait_time: seconds to wait between iterations of func
+        :param func: the async function that will be awaited
+        :param args: any args that need to be provided to func
+        """
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        await func(*args, **kwargs)
+        loop.call_later(wait_time, loop.create_task, cls._run_periodically(loop, wait_time,
func, *args, **kwargs))
+
+    async def _poll(self):
+        new_cache = {}
+        watch_index = {}
+
+        for watch in self._watches:
+            new_cache_for_watch = await self._get_s3_files(watch.path)
+            new_index = {file: watch for file in new_cache_for_watch}
+
+            new_cache = {**new_cache, **new_cache_for_watch}
+            watch_index = {**watch_index, **new_index}
+        difference = set(new_cache.items()) - set(self._cache.items())
+
+        if self._has_polled or self._initial_scan:
+            for (file, modified_date) in difference:
+                watch = watch_index[file]
+                file_is_new = file not in self._cache
+
+                if file_is_new:
+                    watch.event_handler.on_created(S3FileCreatedEvent(src_path=file))
+                else:
+                    watch.event_handler.on_modified(S3FileModifiedEvent(src_path=file))
+
+        self._cache = new_cache
+        self._has_polled = True
+
+    async def _get_s3_files(self, path: str):
+        new_cache = {}
+
+        start = time.perf_counter()
+        async with aioboto3.resource("s3") as s3:
+            bucket = await s3.Bucket(self._bucket)
+            async for file in bucket.objects.filter(Prefix=path):
+                new_cache[file.key] = await file.last_modified
+        end = time.perf_counter()
+        duration = end - start
+
+        print(f"Retrieved {len(new_cache)} objects in {duration}")
+
+        return new_cache
+
+
+async def test():
+    observer = S3Observer(bucket="nexus-ingest", initial_scan=False)
+    handler = Handler()
+    observer.schedule('avhrr/2012', handler)
+    observer.schedule('avhrr/2013', handler)
+
+    await observer.start()
+
+    while True:
+        try:
+            await asyncio.sleep(1)
+        except KeyboardInterrupt:
+            return
+
+
+class Handler:
+    def on_created(self, event: S3Event):
+        print(f"File created: {event.src_path}")
+
+    def on_modified(self, event: S3Event):
+        print(f"File modified: {event.src_path}")
+
+
+if __name__ == "__main__":
+    asyncio.run(test())
diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py
index 635d3dc..553e1b7 100644
--- a/collection_manager/collection_manager/services/__init__.py
+++ b/collection_manager/collection_manager/services/__init__.py
@@ -16,3 +16,4 @@
 from .CollectionProcessor import CollectionProcessor
 from .CollectionWatcher import CollectionWatcher
 from .MessagePublisher import MessagePublisher
+from .S3Observer import S3Observer
diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt
index ee12c89..34f1334 100644
--- a/collection_manager/requirements.txt
+++ b/collection_manager/requirements.txt
@@ -4,4 +4,5 @@ pysolr==3.9.0
 watchdog==0.10.2
 requests==2.23.0
 aio-pika==6.6.1
-tenacity==6.2.0
\ No newline at end of file
+tenacity==6.2.0
+aioboto3==8.0.5
\ No newline at end of file


Mime
View raw message