Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EF828200BC4 for ; Fri, 4 Nov 2016 16:35:57 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id ECABD160B04; Fri, 4 Nov 2016 15:35:57 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9AF53160AFE for ; Fri, 4 Nov 2016 16:35:55 +0100 (CET) Received: (qmail 64112 invoked by uid 500); 4 Nov 2016 15:35:54 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 64066 invoked by uid 99); 4 Nov 2016 15:35:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Nov 2016 15:35:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6FF8BE0C0A; Fri, 4 Nov 2016 15:35:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: scnakandala@apache.org To: commits@airavata.apache.org Date: Fri, 04 Nov 2016 15:35:55 -0000 Message-Id: <7f20952849e3407880369f87b0bdd396@git.apache.org> In-Reply-To: <760f5314799149bda6a924b9da38baf7@git.apache.org> References: <760f5314799149bda6a924b9da38baf7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] airavata git commit: cleaning modules archived-at: Fri, 04 Nov 2016 15:35:58 -0000 http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/amqpwstunnel/python/amqpwstunnel.py ---------------------------------------------------------------------- diff --git a/sandbox/amqpwstunnel/python/amqpwstunnel.py b/sandbox/amqpwstunnel/python/amqpwstunnel.py new file mode 100644 index 0000000..af5d68a --- /dev/null +++ b/sandbox/amqpwstunnel/python/amqpwstunnel.py @@ -0,0 +1,583 @@ +import argparse +import base64 +import functools +import json +import sys +import uuid +import weakref + +from threading import Thread, Lock + +try: + from urllib.parse import urlencode +except ImportError: + from urllib import urlencode + +import pika +import tornado.websocket +import tornado.ioloop +import tornado.auth +import tornado.escape +import tornado.concurrent + + +SETTINGS = {} + + +class Error(Exception): + """Base error class for exceptions in this module""" + pass + +class ConsumerConfigError(Error): + """Raised when an issue with consumer configuration occurs""" + def __init__(self, message): + self.message = message + +class ConsumerKeyError(Error): + def __init__(self, message, key): + self.message = message + self.key = key + +class AuthError(Error): + """Raised when something went wrong during authentication""" + def __init__(self, error, code): + self.message = error + self.code = code + + + +class PikaAsyncConsumer(Thread): + + """ + The primary entry point for routing incoming messages to the proper handler. + + """ + + def __init__(self, rabbitmq_url, exchange_name, queue_name, + exchange_type="direct", routing_key="#"): + """ + Create a new instance of Streamer. + + Arguments: + rabbitmq_url -- URL to RabbitMQ server + exchange_name -- name of RabbitMQ exchange to join + queue_name -- name of RabbitMQ queue to join + + Keyword Arguments: + exchange_type -- one of 'direct', 'topic', 'fanout', 'headers' + (default 'direct') + routing_keys -- the routing key that this consumer listens for + (default '#', receives all messages) + + """ + print("Creating new consumer") + super(PikaAsyncConsumer, self).__init__(daemon=True) + self._connection = None + self._channel = None + self._shut_down = False + self._consumer_tag = None + self._url = rabbitmq_url + self._client_list = [] + self._lock = Lock() + + # The following are necessary to guarantee that both the RabbitMQ + # server and Streamer know where to look for messages. These names will + # be decided before dispatch and should be recorded in a config file or + # else on a per-job basis. + self._exchange = exchange_name + self._exchange_type = exchange_type + self._queue = queue_name + self._routing_key = routing_key + + def add_client(self, client): + """Add a new client to the recipient list. + + Arguments: + client -- a reference to the client object to add + """ + self._lock.acquire() + # Create a weakref to ensure that cyclic references to WebSocketHandler + # objects do not cause problems for garbage collection + self._client_list.append(weakref.ref(client)) + self._lock.release() + + def remove_client(self, client): + """Remove a client from the recipient list. + + Arguments: + client -- a reference to the client object to remove + """ + self._lock.acquire() + for i in range(0, len(self._client_list)): + # Parentheses after _client_list[i] to deference the weakref to its + # strong reference + if self._client_list[i]() is client: + self._client_list.pop(i) + break + self._lock.release() + + + def connect(self): + """ + Create an asynchronous connection to the RabbitMQ server at URL. + + """ + return pika.SelectConnection(pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_close_callback=self.on_connection_close, + stop_ioloop_on_close=False) + + def on_connection_open(self, unused_connection): + """ + Actions to perform when the connection opens. This may not happen + immediately, so defer action to this callback. + + Arguments: + unused_connection -- the created connection (by this point already + available as self._connection) + + """ + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_connection_close(self, connection, code, text): + """ + Actions to perform when the connection is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + + """ + self._channel = None + if self._shut_down: + self._connection.ioloop.stop() + else: + self._connection.add_timeout(5, self.reconnect) + + def reconnect(self): + """ + Attempt to reestablish a connection with the RabbitMQ server. + """ + self._connection.ioloop.stop() # Stop the ioloop to completely close + + if not self._shut_down: # Connect and restart the ioloop + self._connection = self.connect() + self._connection.ioloop.start() + + def on_channel_open(self, channel): + """ + Store the opened channel for future use and set up the exchange and + queue to be used. + + Arguments: + channel -- the Channel instance opened by the Channel.Open RPC + """ + self._channel = channel + self._channel.add_on_close_callback(self.on_channel_close) + self.declare_exchange() + + + def on_channel_close(self, channel, code, text): + """ + Actions to perform when the channel is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + self._connection.close() + + def declare_exchange(self): + """ + Set up the exchange that will route messages to this consumer. Each + RabbitMQ exchange is uniquely identified by its name, so it does not + matter if the exchange has already been declared. + """ + self._channel.exchange_declare(self.declare_exchange_success, + self._exchange, + self._exchange_type) + + def declare_exchange_success(self, unused_connection): + """ + Actions to perform on successful exchange declaration. + """ + self.declare_queue() + + def declare_queue(self): + """ + Set up the queue that will route messages to this consumer. Each + RabbitMQ queue can be defined with routing keys to use only one + queue for multiple jobs. + """ + self._channel.queue_declare(self.declare_queue_success, + self._queue) + + def declare_queue_success(self, method_frame): + """ + Actions to perform on successful queue declaration. + """ + self._channel.queue_bind(self.munch, + self._queue, + self._exchange, + self._routing_key + ) + + def munch(self, unused): + """ + Begin consuming messages from the Airavata API server. + """ + self._channel.add_on_cancel_callback(self.cancel_channel) + self._consumer_tag = self._channel.basic_consume(self._process_message) + + def cancel_channel(self, method_frame): + if self._channel is not None: + self._channel._close() + + def _process_message(self, ch, method, properties, body): + """ + Receive and verify a message, then pass it to the router. + + Arguments: + ch -- the channel that routed the message + method -- delivery information + properties -- message properties + body -- the message + """ + print("Received Message: %s" % body) + self._lock.acquire() + for client in self._client_list: + # Parentheses after client to deference the weakref to its + # strong reference + client().write_message(body) + self._lock.release() + self._channel.basic_ack(delivery_tag=method.delivery_tag) + + def stop_consuming(self): + """ + Stop the consumer if active. + """ + if self._channel: + self._channel.basic_cancel(self.close_channel, self._consumer_tag) + + def close_channel(self, unused): + """ + Close the channel to shut down the consumer and connection. + """ + self._channel.queue_delete(queue=self._queue) + self._channel.close() + + def run(self): + """ + Start a connection with the RabbitMQ server. + """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """ + Stop an active connection with the RabbitMQ server. + """ + self._closing = True + self.stop_consuming() + + +class Wso2OAuth2Mixin(tornado.auth.OAuth2Mixin): + _OAUTH_AUTHORIZE_URL = "https://idp.scigap.org:9443/oauth2/authorize" + _OAUTH_ACCESS_TOKEN_URL = "https://idp.scigap.org:9443/oauth2/token" + + @tornado.auth._auth_return_future + def get_authenticated_user(self, username, password, callback=None): + print("Authenticating user %s" % (username)) + http = self.get_auth_http_client() + body = urlencode({ + "client_id": SETTINGS["oauth_client_key"], + "client_secret": SETTINGS["oauth_client_secret"], + "grant_type": SETTINGS["oauth_grant_type"], + "username": username, + "password": password + }) + http.fetch(self._OAUTH_ACCESS_TOKEN_URL, functools.partial(self._on_access_token, callback), method="POST", body=body) + + def _on_access_token(self, future, response): + if response.error: + print(str(response)) + print(response.body) + print(response.error) + future.set_exception(AuthError(response.error, response.code)) + return + + print(response.body) + future.set_result(tornado.escape.json_decode(response.body)) + +class AuthHandler(tornado.web.RequestHandler, Wso2OAuth2Mixin): + def get_current_user(self): + expires_in = self.get_secure_cookie("expires-in", max_age_days=SETTINGS['maximum_cookie_age']) + print(expires_in) + if expires_in: + return self.get_secure_cookie("ws-auth-token", max_age_days=float(expires_in)) + return None + + def set_default_headers(self): + self.set_header("Content-Type", "text/plain") + self.set_header("Access-Control-Allow-Origin", "*") + self.set_header("Access-Control-Allow-Headers", "x-requested-with") + self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') + + def get(self): + if self.get_current_user(): + self.set_status(200) + print("Authenticated") + self.write("Authenticated") + + else: + self.set_status(403) + print("Not Authenticated") + self.write("Not Authenticated") + + @tornado.gen.coroutine + def post(self): + try: + username = self.get_body_argument("username") + password = self.get_body_argument("password") + redirect = self.get_body_argument("redirect") + if username == "" or password == "": + raise tornado.web.MissingArgumentError + + access = yield self.get_authenticated_user(username, password) + days = (access["expires_in"] / 3600) / 24 # Convert to days + print(days) + self.set_secure_cookie("ws-auth-token", + access["access_token"], + expires_days=days) + self.set_secure_cookie("expires-in", + str(1), + expires_days=SETTINGS['maximum_cookie_age']) + self.write("Success") + except tornado.web.MissingArgumentError: + print("Missing an argument") + self.set_status(400) + self.write("Authentication information missing") + except AuthError as e: + print("The future freaks me out") + self.set_status(access.code) + self.set_header("Content-Type", "text/html") + self.write(access.message) + + success_code = """

Redirecting to %(url)s

+ + """ % { 'url': redirect} + self.set_status(200) + self.redirect(redirect) + #return self.render_string(success_code) + + + +class AMQPWSHandler(tornado.websocket.WebSocketHandler):#, Wso2OAuth2Mixin): + + """ + Pass messages to a connected WebSockets client. + + A subclass of the Tornado WebSocketHandler class, this class takes no + action when receiving a message from the client. Instead, it is associated + with an AMQP consumer and writes a message to the client each time one is + consumed in the queue. + """ + + # def set_default_headers(self): + # self.set_header("Access-Control-Allow-Origin", "*") + # self.set_header("Access-Control-Allow-Headers", "x-requested-with") + # self.set_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS') + + def check_origin(self, origin): + """Check the domain origin of the connection request. + + This can be made more robust to ensure that connections are only + accepted from verified PGAs. + + Arguments: + origin -- the value of the Origin HTTP header + """ + return True + + def open(self, resource_type, resource_id): + """Associate a new connection with a consumer. + + When a new connection is opened, it is a request to retrieve data + from an AMQP queue. The open operation should also do some kind of + authentication. + + Arguments: + resource_type -- "experiment" or "project" or "data" + resource_id -- the Airavata id for the resource + """ + self.stream.set_nodelay(True) + self.resource_id = resource_id + self.write_message("Opened the connection") + + self.add_to_consumer() + + # expires_in = self.get_secure_cookie("expires_in", max_age_days=SETTINGS["maximum_cookie_age"]) + # if expires_in is not None and self.get_secure_cookie("ws-auth-token", max_age_days=float(expires_in)): + # print("Found secure cookie") + # self.write_message("Authenticated") + # self.add_to_consumer() + # else: + # print("Closing connection") + # self.close() + + def on_message(self, message): + """Handle incoming messages from the client. + + Tornado requires subclasses to override this method, however in this + case we do not wish to take any action when receiving a message from + the client. The purpose of this class is only to push messages to the + client. + """ + print(message) + message = tornado.escape.json_decode(message) + access = yield self.get_authenticated_user(message["username"], message["password"]) + access = access + days = (access["expires_in"] / 3600) / 24 # Convert to days + print(days) + self.set_secure_cookie("ws-auth-token", + access["access_token"], + expires_days=days) + self.set_secure_cookie("expires_in", + str(days), + expires_days=SETTINGS['maximum_cookie_age']) + + + def on_close(self): + try: + print("Closing connection") + self.application.remove_client_from_consumer(self.resource_id, self) + except KeyError: + print("Error: resource %s does not exist" % self.resource_id) + finally: + self.close() + + def add_to_consumer(self): + try: + self.application.add_client_to_consumer(self.resource_id, self) + except AttributeError as e: + print("Error: tornado.web.Application object is not AMQPWSTunnel") + print(e) + + +class AMQPWSTunnel(tornado.web.Application): + + """ + Send messages from an AMQP queue to WebSockets clients. + + In addition to the standard Tornado Application class functionality, this + class maintains a list of active AMQP consumers and maps WebSocketHandlers + to the correct consumers. + """ + + def __init__(self, consumer_list=None, consumer_config=None, handlers=None, + default_host='', transforms=None, **settings): + print("Starting AMQP-WS-Tunnel application") + super(AMQPWSTunnel, self).__init__(handlers=handlers, + default_host=default_host, + transforms=transforms, + **settings) + + self.consumer_list = {} if consumer_list is None else consumer_list + if consumer_config is None: + raise ConsumerConfigError("No consumer configuration provided") + self.consumer_config = consumer_config + + def consumer_exists(self, resource_id): + """Determine if a consumer exists for a particular resource. + + Arguments: + resource_id -- the consumer to find + """ + return resource_id in self.consumer_list + + def add_client_to_consumer(self, resource_id, client): + """Add a new client to a consumer's messaging list. + + Arguments: + resource_id -- the consumer to add to + client -- the client to add + """ + if not self.consumer_exists(resource_id): + print("Creating new consumer") + print(self.consumer_config) + consumer = PikaAsyncConsumer(self.consumer_config["rabbitmq_url"], + self.consumer_config["exchange_name"], + self.consumer_config["queue_name"], + exchange_type=self.consumer_config["exchange_type"], + routing_key=resource_id) + print("Adding to consumer list") + self.consumer_list[resource_id] = consumer + print("Starting consumer") + consumer.start() + + print("Adding new client to %s" % (resource_id)) + consumer = self.consumer_list[resource_id] + consumer.add_client(client) + + def remove_client_from_consumer(self, resource_id, client): + """Remove a client from a consumer's messaging list. + + Arguments: + resource_id -- the consumer to remove from + client -- the client to remove + """ + if self.consumer_exists(resource_id): + print("Removing client from %s" % (resource_id)) + self.consumer_list[resource_id].remove_client(client) + #else: + # raise ConsumerKeyError("Trying to remove client from nonexistent consumer", resource_id) + + def shutdown(self): + """Shut down the application and release all resources. + + + """ + for name, consumer in self.consumer_list.items(): + consumer.stop() + #consumer.join() + #self.consumer_list[name] = None + + #self.consumer_list = {} + + + +if __name__ == "__main__": + i = open(sys.argv[1]) + config = json.load(i) + i.close() + + SETTINGS["oauth_client_key"] = config["oauth_client_key"] + SETTINGS["oauth_client_secret"] = config["oauth_client_secret"] + SETTINGS["oauth_grant_type"] = config["oauth_grant_type"] + SETTINGS["maximum_cookie_age"] = config["maximum_cookie_age"] + + settings = { + "cookie_secret": base64.b64encode(uuid.uuid4().bytes + uuid.uuid4().bytes), + #"xsrf_cookies": True + } + + application = AMQPWSTunnel(handlers=[ + (r"/auth", AuthHandler), + (r"/(experiment)/(.+)", AMQPWSHandler) + ], + consumer_config=config, + debug=True, + **settings) + + application.listen(8888) + + try: + tornado.ioloop.IOLoop.current().start() + except KeyboardInterrupt: + application.shutdown() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/amqpwstunnel/python/config.json ---------------------------------------------------------------------- diff --git a/sandbox/amqpwstunnel/python/config.json b/sandbox/amqpwstunnel/python/config.json new file mode 100644 index 0000000..b092001 --- /dev/null +++ b/sandbox/amqpwstunnel/python/config.json @@ -0,0 +1,10 @@ +{ + "rabbitmq_url": "amqp://airavata:airavata@gw56.iu.xsede.org:5672/messaging", + "exchange_name": "simstream", + "queue_name": "test", + "exchange_type": "direct", + "oauth_client_key": "y7xgdnNUx6ifOswJTPcqtzw4aOEa", + "oauth_client_secret": "CgfbuupAPhaOBSBPSScZUWHNANwa", + "oauth_grant_type": "password", + "maximum_cookie_age": 1 +} http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/amqpwstunnel/wstest.html ---------------------------------------------------------------------- diff --git a/sandbox/amqpwstunnel/wstest.html b/sandbox/amqpwstunnel/wstest.html new file mode 100644 index 0000000..eedbf78 --- /dev/null +++ b/sandbox/amqpwstunnel/wstest.html @@ -0,0 +1,157 @@ + + + + + + + AMQP Websockets Test + + + + + + +
+
+ + + + + +
+ +
+ + + + +
+ +
+ +

    Logs

+
+ + + + + + + http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/README.md ---------------------------------------------------------------------- diff --git a/sandbox/simstream/README.md b/sandbox/simstream/README.md new file mode 100755 index 0000000..9ab1379 --- /dev/null +++ b/sandbox/simstream/README.md @@ -0,0 +1,18 @@ +# simstream +A utility for user-defined remote system and simulation data monitoring. + +## Dependencies +* pika >= 0.10.0 (`pip install pika`) +* A running, accessible instance of RabbitMQ server + +## Installation +1. Clone this repository +2. `python setup.py install` + +## Running the Example +The example runs a simple collector that records the maximum memory used by the server (MB) and a timestamp. It also generates a plot of the results. + +1. Edit `example/memory_consumption.py` and `example/memory_streamer.py` with the correct RabbitMQ settings +2. From the repository root, run `python example/memory_consumption.py` +3. Open a new terminal session and run `python example/memory_streamer.py` +4. Memory usage information should now be collected in the current terminal and received in the original terminal http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/README.md ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/README.md b/sandbox/simstream/example/README.md new file mode 100755 index 0000000..23f36d5 --- /dev/null +++ b/sandbox/simstream/example/README.md @@ -0,0 +1,9 @@ +# SimStream Examples + +This directory contains several examples showcasing the functionality of SimStream. To run them, download and install Python 2.7/3.5, install SimStream using setup.py, and modify the settings.json file to match your RabbitMQ server settings. + +## The Examples + +* mem_streamer: Stream max RSS memory consumed by a basic SimStream utility +* logfile_checker: Collect, filter, and stream tagged log file entries +* openmm_example: Run a molecular dynamics simulation and return log information and system state measured by root mean squared deviation \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/README.md ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/README.md b/sandbox/simstream/example/logfile_checker/README.md new file mode 100755 index 0000000..30ed071 --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/README.md @@ -0,0 +1,23 @@ +# SimStream Example: Logfile Streaming + +This example filters log file entries by starting tag and sends them to a remote listener. The listener prints the logs it receives to terminal. + +## Instructions + +### Start the Publisher +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_streamer.py` + +### Start the Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_consumer.py` + +### Write Some Logs +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `chmod 700 generate_logs.sh` +4. `./generate_logs.sh` + +This will write logs to `test.txt`. The Publisher will continuously check for new logs, filter based on the [STATUS] and [ERROR] tags, and send the filtered results to the RabbitMQ server. The Consumer will receive the filtered log entries and print them to the terminal. http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/generate_logs.sh ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/generate_logs.sh b/sandbox/simstream/example/logfile_checker/generate_logs.sh new file mode 100755 index 0000000..5fb7aa0 --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/generate_logs.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +outfile="test.txt" + +echo "[STATUS] Starting logfile generator" >> $outfile + +sleep 2 + +echo "[STATUS] Doing stuff" >> $outfile +echo "Stuff that doesn't need to be reported" >> $outfile +echo "Stuff that also doesn't need to be reported" >> $outfile +echo "[DATA] 7.267" >> $outfile + +sleep 2 + +echo "[STATUS] Doing more stuff" >> $outfile +echo "Yet more stuff that doesn't need to be reported" >> $outfile +echo "[ERROR] Some non-fatal error that the user should know about" >> $outfile + +sleep 2 + +echo "[STATUS] Finished generating logs" >> $outfile \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/log_consumer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/log_consumer.py b/sandbox/simstream/example/logfile_checker/log_consumer.py new file mode 100755 index 0000000..bf3beac --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/log_consumer.py @@ -0,0 +1,43 @@ +import json +from simstream import PikaAsyncConsumer + +#settings = { +# "url": "amqp://guest:guest@localhost:5672", +# "exchange": "simstream", +# "queue": "test", +# "routing_key": "logfile", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +def print_log_line(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + + +consumer = PikaAsyncConsumer( + settings["url"], + settings["exchange"], + settings["queue"], + print_log_line, + exchange_type=settings["exchange_type"], + routing_key=settings["routing_key"] + ) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/log_streamer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/log_streamer.py b/sandbox/simstream/example/logfile_checker/log_streamer.py new file mode 100755 index 0000000..65f84f0 --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/log_streamer.py @@ -0,0 +1,111 @@ +from simstream import SimStream, DataReporter + +import sys, json + +class LogMonitor(object): + """ + A callable class that returns unprocessed lines in an open logfile. + + Instance Variables: + logfile -- the path to the logfile to monitor + """ + + def __init__(self, logfile): + """ + Set up a monitor for a logfile. + + Arguments: + logfile -- the path to the logfile to monitor + """ + self.logfile = logfile + self._generator = None + self._version = sys.version_info[0] + + def __call__(self): + """ + Get the next line from the logfile. + """ + if not self._generator: + self._generator = self._monitor_logfile() + + lines = [] + + line = self._next() + while line is not None: + lines.append(line) + line = self._next() + + return lines + + def _monitor_logfile(self): + """ + Yield the next set of lines from the logfile. + """ + try: + # Make the file persistent for the lifetime of the generator + with open(self.logfile) as f: + f.seek(0,2) # Move to the end of the file + while True: + # Get the next line or indicate the end of the file + line = f.readline() + if line: + yield line.strip() + else: + yield None + + except EnvironmentError as e: + # Handle I/O exceptions in an OS-agnostic way + print("Error: Could not open file %s: %s" % (self.logfile, e)) + + def _next(self): + """ + Python 2/3 agnostic retrieval of generator values. + """ + return self._generator.__next__() if self._version == 3 else self._generator.next() + + +def get_relevant_log_lines(log_lines): + import re + relevant_lines = [] + pattern = r'^\[(STATUS|ERROR)\]' + for line in log_lines: + if re.match(pattern, line) is not None: + relevant_lines.append(line) + return relevant_lines + + +#settings = { +# "url": "amqp://guest:guest@localhost:5672", +# "exchange": "simstream", +# "queue": "test", +# "routing_key": "logfile", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +if __name__ == "__main__": + logfile = sys.argv[1] + log_reporter = DataReporter() + log_reporter.add_collector("logger", + LogMonitor(logfile), + settings["url"], + settings["exchange"], + limit=10, + interval=2, + exchange_type=settings["exchange_type"], + postprocessor=get_relevant_log_lines) + + log_reporter.start_streaming("logger", settings["routing_key"]) + + streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter}) + streamer.setup() + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/remote_log.slurm ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/remote_log.slurm b/sandbox/simstream/example/logfile_checker/remote_log.slurm new file mode 100644 index 0000000..55834e9 --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/remote_log.slurm @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +#SBATCH -J remote_logger # Job name +#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) +#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId) +#SBATCH -p development # large queue for jobs > 256 nodes +#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours +#SBATCH -n 1 # Nodes to use + +module use "/home1/03947/tg832463/modulefiles" +module load openmm + +touch test.txt + +python log_streamer.py test.txt & + +while true; do + bash generate_logs.sh + sleep 5 +done + http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/logfile_checker/test.txt ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/logfile_checker/test.txt b/sandbox/simstream/example/logfile_checker/test.txt new file mode 100755 index 0000000..2ffb48c --- /dev/null +++ b/sandbox/simstream/example/logfile_checker/test.txt @@ -0,0 +1,657 @@ +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs +[STATUS] Starting logfile generator +[STATUS] Doing stuff +Stuff that doesn't need to be reported +Stuff that also doesn't need to be reported +[DATA] 7.267 +[STATUS] Doing more stuff +Yet more stuff that doesn't need to be reported +[ERROR] Some non-fatal error that the user should know about +[STATUS] Finished generating logs http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/mem_streamer/README.md ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/mem_streamer/README.md b/sandbox/simstream/example/mem_streamer/README.md new file mode 100755 index 0000000..897b77a --- /dev/null +++ b/sandbox/simstream/example/mem_streamer/README.md @@ -0,0 +1,17 @@ +# SimStream Example: Memory Usage Streamer + +This example collects data on the memory used by the Publisher and sends that data to the Consumer. + +## Instructions + +### Start the Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/logfile_checker` +3. `python log_consumer.py` + +### Starting the Consumer +1. Open a new terminal +2. `cd path/to/simstream/examples/mem_streamer` +3. `python memory_consumer.py + +The Consumer should receive the memory used by the Publisher (KB) and the time that the data was collected (s since UNIX epoch) at a 2-second interval. http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/mem_streamer/memory_consumption.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/mem_streamer/memory_consumption.py b/sandbox/simstream/example/mem_streamer/memory_consumption.py new file mode 100755 index 0000000..b67e975 --- /dev/null +++ b/sandbox/simstream/example/mem_streamer/memory_consumption.py @@ -0,0 +1,83 @@ +import tornado.ioloop +import tornado.web +import tornado.websocket + +import json + +from simstream import PikaAsyncConsumer, PikaProducer + +#settings = { +# "url": "amqp://localhost:5672", +# "exchange": "simstream", +# "queue": "remote_node", +# "routing_key": "test", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + + +def print_result(body): + try: + data = json.loads(body.decode()) + print("%s: %s" % (data["x"], data["y"])) + except json.decoder.JSONDecodeError as e: + print("[ERROR] Could not decode JSON %s: %s", (body, e)) + except UnicodeError as e: + print("[ERROR] Could not decode message %s: %s" % (body, e.reason)) + +consumer = PikaAsyncConsumer(settings['url'], + settings['exchange'], + settings['queue'], + print_result, + exchange_type=settings['exchange_type'], + routing_key=settings['routing_key']) + +consumer.start() + +# class PlotHandler(tornado.web.RequestHandler): + +# def get(self): +# pass + + +# class StreamingHandler(tornado.websocket.WebSocketHandler): + +# def open(self): +# self.consumer = PikaAsyncConsumer(settings.url, +# settings.exchange, +# settings.queue, +# self.send_data, +# routing_keys=settings.routing_key, +# exchange_type=settings.exchange_type +# ) +# self.producer = PikaProducer("", +# remote_settings.url, +# remote_settings.exchange, +# remote_settings.queue, +# remote_settings.routing_key) + +# def on_message(self, message): +# if hasattr(self, producer) and producer is not None: +# self.producer.send_data(message) + +# def on_close(self): +# self.consumer.stop() +# self.producer.shutdown() +# self.consumer = None +# self.producer = None + +# def send_data(self, ch, method, properties, body): +# self.write_message(body) + +# if __name__ == "__main__": +# app = tornado.web.Application([ +# (r"/plot/(.*)", ) +# (r"/stream/(.*)", StreamingHandler) +# ]) +# app.listen(8888) +# tornado.ioloop.IOLoop.current().start() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/mem_streamer/memory_streamer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/mem_streamer/memory_streamer.py b/sandbox/simstream/example/mem_streamer/memory_streamer.py new file mode 100755 index 0000000..88f0d9a --- /dev/null +++ b/sandbox/simstream/example/mem_streamer/memory_streamer.py @@ -0,0 +1,46 @@ +import resource +import time +import json + +from simstream import SimStream, DataReporter, DataCollector + +#settings = { +# "url": "amqp://localhost:5672", +# "exchange": "simstream", +# "queue": "remote_node", +# "routing_key": "stream_sender", +# "exchange_type": "topic" +#} + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + settings["routing_key"] = "memory" + +def mem_callback(): + return {'x': time.time() * 1000, + 'y': resource.getrusage(resource.RUSAGE_SELF).ru_maxrss} + + +def mem_postprocessor(rss): + rss.y = rss.y / 1000000 + return rss + +mem_reporter = DataReporter() +mem_reporter.add_collector("rss", + mem_callback, + settings["url"], + settings["exchange"], + limit=100, + interval=2, + postprocessor=mem_postprocessor, + ) + +mem_reporter.start_streaming("rss", "test") + +if __name__ == "__main__": + resource_streamer = SimStream(reporters={"memory": mem_reporter}, + config=settings) + resource_streamer.setup() + resource_streamer.start() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/README.md ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/README.md b/sandbox/simstream/example/openmm_example/README.md new file mode 100644 index 0000000..59a0588 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/README.md @@ -0,0 +1,33 @@ +# SimStream Example: Simulating Alanine Dipeptide + +This example runs a simulation of the small molecule Alanine Dipeptide and streams logs and RMSD. RMSD is a metric for judging how similar two molecular states are for the same model. + +## Instructions + +### Installing OpenMM +The easiest way to install OpenMM is to use the Anaconda distribution of Python and run +`conda install -c https://conda.anaconda.org/omnia openmm` + +If you do not wish to use Anaconda, install OpenMM from source by following the instructions in the [OpenMM docs](http://docs.openmm.org/7.0.0/userguide/application.html#installing-openmm "OpenMM documentation") + +### Start the Logfile Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_log_consumer.py` + +### Start the RMSD Consumer +1. Open a terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_rmsd_consumer.py` + +### Starting the Producer +1. Open a new terminal +2. `cd path/to/simstream/examples/openmm_example` +3. `python openmm_streamer.py application/sim.out application/trajectory.dcd application/input.pdb application/input.pdb` + +### Starting the Simulation +1. Open a new terminal +2. `cd path/to/simstream/examples/openmm_example/application` +3. `python alanine_dipeptide.py > sim.out` + +The Logfile Consumer should now be printing tagged log entries to the screen; the RMSD Consumer should be printing the calculated RMSD each time the trajectory file is written. http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/application/alanine_dipeptide.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/application/alanine_dipeptide.py b/sandbox/simstream/example/openmm_example/application/alanine_dipeptide.py new file mode 100644 index 0000000..8b22b16 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/application/alanine_dipeptide.py @@ -0,0 +1,55 @@ +########################################################################## +# this script was generated by openmm-builder. to customize it further, +# you can save the file to disk and edit it with your favorite editor. +########################################################################## + +from __future__ import print_function +from simtk.openmm import app +import simtk.openmm as mm +from simtk import unit +from sys import stdout + +print("[START] Application is now running") + +pdb = app.PDBFile('input.pdb') +print("[STATUS] Loaded model") +forcefield = app.ForceField('amber03.xml', 'amber03_obc.xml') +print("[STATUS] Loaded force field") + +system = forcefield.createSystem(pdb.topology, nonbondedMethod=app.NoCutoff, + constraints=None, rigidWater=False) +print("[STATUS] Created system") +integrator = mm.LangevinIntegrator(300*unit.kelvin, 91/unit.picoseconds, + 1.0*unit.femtoseconds) +print("[STATUS] Created integrator") + +try: + platform = mm.Platform.getPlatformByName('CPU') +except Exception as e: + print("[ERROR] Could not load platform CPU. Running Reference") + platform = mm.Platform.getPlatformByName("Reference") + +simulation = app.Simulation(pdb.topology, system, integrator, platform) +print("[STATUS] Set up compute platform") +simulation.context.setPositions(pdb.positions) +print("[STATUS] Set atomic positions") + +print('[STATUS] Minimizing...') +simulation.minimizeEnergy() +print('[STATUS] Equilibrating...') +simulation.step(100) + +simulation.reporters.append(app.DCDReporter('trajectory.dcd', 1000)) +simulation.reporters.append(app.StateDataReporter(stdout, 1000, step=True, + potentialEnergy=True, totalEnergy=True, temperature=True, separator='\t')) +print("[STATUS] Set up reporters") + +print('[STATUS] Running Production...') + +increment = 1000 + +for i in range(0,100000,increment): + print("[STATUS] Step %s" % (i)) + simulation.step(increment) + +print('[END] Done!') http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/application/input.pdb ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/application/input.pdb b/sandbox/simstream/example/openmm_example/application/input.pdb new file mode 100644 index 0000000..a47f196 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/application/input.pdb @@ -0,0 +1,24 @@ +ATOM 1 1HH3 ACE 1 4.300 13.100 8.600 1.00 0.00 +ATOM 2 CH3 ACE 1 5.200 13.600 8.800 1.00 0.00 +ATOM 3 2HH3 ACE 1 4.900 14.300 9.600 1.00 0.00 +ATOM 4 3HH3 ACE 1 5.600 14.200 7.900 1.00 0.00 +ATOM 5 C ACE 1 6.100 12.500 9.400 1.00 0.00 +ATOM 6 O ACE 1 6.400 12.500 10.600 1.00 0.00 +ATOM 7 N ALA 2 6.600 11.600 8.500 1.00 0.00 +ATOM 8 H ALA 2 6.500 11.600 7.500 1.00 0.00 +ATOM 9 CA ALA 2 7.300 10.400 9.100 1.00 0.00 +ATOM 10 HA ALA 2 7.900 10.700 10.000 1.00 0.00 +ATOM 11 CB ALA 2 6.200 9.500 9.600 1.00 0.00 +ATOM 12 HB1 ALA 2 5.700 9.100 8.800 1.00 0.00 +ATOM 13 HB2 ALA 2 6.600 8.700 10.200 1.00 0.00 +ATOM 14 HB3 ALA 2 5.400 10.000 10.200 1.00 0.00 +ATOM 15 C ALA 2 8.400 9.800 8.200 1.00 0.00 +ATOM 16 O ALA 2 8.400 9.900 7.000 1.00 0.00 +ATOM 17 N NME 3 9.300 9.000 8.800 1.00 0.00 +ATOM 18 H NME 3 9.100 9.000 9.800 1.00 0.00 +ATOM 19 CH3 NME 3 10.500 8.400 8.300 1.00 0.00 +ATOM 20 1HH3 NME 3 10.700 7.700 9.100 1.00 0.00 +ATOM 21 2HH3 NME 3 10.400 8.000 7.300 1.00 0.00 +ATOM 22 3HH3 NME 3 11.300 9.100 8.300 1.00 0.00 +TER +ENDMDL http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/application/trajectory.dcd ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/application/trajectory.dcd b/sandbox/simstream/example/openmm_example/application/trajectory.dcd new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/openmm_consumer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/openmm_consumer.py b/sandbox/simstream/example/openmm_example/openmm_consumer.py new file mode 100644 index 0000000..4ba2763 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/openmm_consumer.py @@ -0,0 +1,8 @@ +import json +from simstream import PikaAsyncConsumer + +def recv_log(body): + try: + logs = json.loads(body.decode()) + for log in logs: + print(log) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/openmm_log_consumer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/openmm_log_consumer.py b/sandbox/simstream/example/openmm_example/openmm_log_consumer.py new file mode 100644 index 0000000..e28043f --- /dev/null +++ b/sandbox/simstream/example/openmm_example/openmm_log_consumer.py @@ -0,0 +1,32 @@ +import json +from simstream import PikaAsyncConsumer + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) +settings["routing_key"] = "openmm.log" + +def print_log_line(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + +consumer = PikaAsyncConsumer(settings["url"], + settings["exchange"], + "openmm.log", # settings["queue"], + message_handler=print_log_line, + routing_key=settings["routing_key"], + exchange_type=settings["exchange_type"]) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/openmm_rmsd_consumer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/openmm_rmsd_consumer.py b/sandbox/simstream/example/openmm_example/openmm_rmsd_consumer.py new file mode 100644 index 0000000..f5d87c6 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/openmm_rmsd_consumer.py @@ -0,0 +1,36 @@ +import json +from simstream import PikaAsyncConsumer + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) +settings["routing_key"] = "openmm.rmsd" + +def print_rmsd(body): + try: + lines = json.loads(body.decode()) + if lines is not None: + for line in lines: + print(line[0]) + except json.decoder.JSONDecodeError as e: + print("[Error]: Could not decode %s" % (body)) + except UnicodeError as e: + print("[Error]: Could not decode from bytes to string: %s" % (e.reason)) + except IndexError as e: + print("[Error]: List is empty") + except KeyError: + print(lines) + +consumer = PikaAsyncConsumer(settings["url"], + settings["exchange"], + "openmm.rmsd", # settings["queue"], + message_handler=print_rmsd, + routing_key=settings["routing_key"], + exchange_type=settings["exchange_type"]) + +if __name__ == "__main__": + try: + consumer.start() + except KeyboardInterrupt: + consumer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/openmm_stream.slurm ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/openmm_stream.slurm b/sandbox/simstream/example/openmm_example/openmm_stream.slurm new file mode 100644 index 0000000..837e4d4 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/openmm_stream.slurm @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +#SBATCH -J remote_logger # Job name +#SBATCH -o remote_logger.o%j # Name of stdout output file(%j expands to jobId) +#SBATCH -e remote_logger.o%j # Name of stderr output file(%j expands to jobId) +#SBATCH -p development # large queue for jobs > 256 nodes +#SBATCH -t 00:10:00 # Run time (hh:mm:ss) - 1.5 hours +#SBATCH -n 1 # Nodes to use + +#module use "/home1/03947/tg832463/modulefiles" +#module load openmm + +touch test.txt + +python openmm_streamer.py ./application/sim.out ./application/trajectory.dcd ./application/input.pdb ./application/input.pdb & + +cd application +python alanine_dipeptide.py > sim.out +sleep 5 http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/openmm_streamer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/openmm_streamer.py b/sandbox/simstream/example/openmm_example/openmm_streamer.py new file mode 100644 index 0000000..da95614 --- /dev/null +++ b/sandbox/simstream/example/openmm_example/openmm_streamer.py @@ -0,0 +1,130 @@ +from simstream import SimStream, DataReporter + +import sys, json + +class LogMonitor(object): + """ + A callable class that returns unprocessed lines in an open logfile. + + Instance Variables: + logfile -- the path to the logfile to monitor + """ + + def __init__(self, logfile): + """ + Set up a monitor for a logfile. + + Arguments: + logfile -- the path to the logfile to monitor + """ + self.logfile = logfile + self._generator = None + self._version = sys.version_info[0] + + def __call__(self): + """ + Get the next line from the logfile. + """ + if not self._generator: + self._generator = self._monitor_logfile() + + lines = [] + + line = self._next() + while line is not None: + lines.append(line) + line = self._next() + print(lines) + return lines + + def _monitor_logfile(self): + """ + Yield the next set of lines from the logfile. + """ + try: + # Make the file persistent for the lifetime of the generator + with open(self.logfile) as f: + f.seek(0,2) # Move to the end of the file + while True: + # Get the next line or indicate the end of the file + line = f.readline() + if line: + yield line.strip() + else: + yield None + + except EnvironmentError as e: + # Handle I/O exceptions in an OS-agnostic way + print("Error: Could not open file %s: %s" % (self.logfile, e)) + + def _next(self): + """ + Python 2/3 agnostic retrieval of generator values. + """ + return self._generator.__next__() if self._version == 3 else self._generator.next() + + +def get_relevant_log_lines(log_lines): + import re + relevant_lines = [] + pattern = r'^\[.+\]' + for line in log_lines: + if re.match(pattern, line) is not None: + relevant_lines.append(line) + return relevant_lines + + +def calculate_rmsd(trajectory, topology, reference): + import mdtraj + traj = mdtraj.load(trajectory, top=topology) + ref = mdtraj.load(reference) + rmsd = mdtraj.rmsd(traj, ref) + data = {"step": str(traj.n_frames), "rmsd": str(rmsd[-1])} + return data + +settings = {} + +with open("../settings.json", 'r') as f: + settings = json.load(f) + + +if __name__ == "__main__": + logfile = sys.argv[1] + trajectory = sys.argv[2] + topology = sys.argv[3] + reference = sys.argv[4] + + open(logfile, 'a').close() + open(trajectory, 'a').close() + + log_reporter = DataReporter() + log_reporter.add_collector("logger", + LogMonitor(logfile), + settings["url"], + settings["exchange"], + limit=10, + interval=2, + exchange_type="direct", # settings["exchange_type"], + postprocessor=get_relevant_log_lines) + + log_reporter.start_streaming("logger", "openmm.log") + + rmsd_reporter = DataReporter() + rmsd_reporter.add_collector("rmsd", + calculate_rmsd, + settings["url"], + settings["exchange"], + limit=1, + interval=2, + exchange_type="direct", # settings["exchange_type"], + callback_args=[trajectory, topology, reference]) + + rmsd_reporter.start_streaming("rmsd", "openmm.rmsd") + + streamer = SimStream(config=settings, reporters={"log_reporter": log_reporter, "rmsd_reporter": rmsd_reporter}) + streamer.setup() + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/openmm_example/test.txt ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/openmm_example/test.txt b/sandbox/simstream/example/openmm_example/test.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/example/settings.json ---------------------------------------------------------------------- diff --git a/sandbox/simstream/example/settings.json b/sandbox/simstream/example/settings.json new file mode 100644 index 0000000..d354d46 --- /dev/null +++ b/sandbox/simstream/example/settings.json @@ -0,0 +1,6 @@ +{ + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "queue": "test", + "exchange_type": "topic" +} http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/setup.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/setup.py b/sandbox/simstream/setup.py new file mode 100755 index 0000000..2f3b3fd --- /dev/null +++ b/sandbox/simstream/setup.py @@ -0,0 +1,19 @@ +""" + Setup for simstream module. + + Author: Jeff Kinnison (jkinniso@nd.edu) +""" + +from setuptools import setup, find_packages + +setup( + name="simstream", + version="0.1dev", + author="Jeff Kinnison", + author_email="jkinniso@nd.edu", + packages=find_packages(), + description="", + install_requires=[ + "pika >= 0.10.0" + ], +) http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/__init__.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/__init__.py b/sandbox/simstream/simstream/__init__.py new file mode 100755 index 0000000..9d403cb --- /dev/null +++ b/sandbox/simstream/simstream/__init__.py @@ -0,0 +1,11 @@ +""" +Utilties for collecting and distributing system data. + +Author: Jeff Kinnison (jkinniso@nd.edu) +""" + +from .simstream import SimStream +from .datareporter import DataReporter, CollectorExistsException, CollectorDoesNotExistException +from .datacollector import DataCollector +from .pikaasyncconsumer import PikaAsyncConsumer +from .pikaproducer import PikaProducer