Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 37B94200B82 for ; Fri, 16 Sep 2016 18:11:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 368AD160AC4; Fri, 16 Sep 2016 16:11:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 21EDC160AB7 for ; Fri, 16 Sep 2016 18:11:37 +0200 (CEST) Received: (qmail 23651 invoked by uid 500); 16 Sep 2016 16:11:37 -0000 Mailing-List: contact commits-help@airavata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airavata.apache.org Delivered-To: mailing list commits@airavata.apache.org Received: (qmail 23640 invoked by uid 99); 16 Sep 2016 16:11:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Sep 2016 16:11:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE985E0A05; Fri, 16 Sep 2016 16:11:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lahiru@apache.org To: commits@airavata.apache.org Date: Fri, 16 Sep 2016 16:11:36 -0000 Message-Id: <5faaaaa3b9dd425fad38b8eb98f97876@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/48] airavata git commit: add simstream module to airavata [Forced Update!] archived-at: Fri, 16 Sep 2016 16:11:39 -0000 Repository: airavata Updated Branches: refs/heads/lahiru/AIRAVATA-2065 ce26536fc -> 6786f8ee0 (forced update) http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/pikaproducer.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/pikaproducer.py b/modules/simstream/simstream/pikaproducer.py new file mode 100755 index 0000000..6ffaf3d --- /dev/null +++ b/modules/simstream/simstream/pikaproducer.py @@ -0,0 +1,202 @@ +""" +Utilties for sending data. + +Author: Jeff Kinnison (jkinniso@nd.edu) +""" + +import json +import pika + + +class PikaProducer(object): + """ + Utility for sending job data to a set of endpoints. + """ + + def __init__(self, rabbitmq_url, exchange, exchange_type="direct", routing_keys=[]): + """ + Instantiate a new PikaProducer. + + Arguments: + rabbitmq_url -- the url of the RabbitMQ server to send to + exchange -- the name of the exchange to send to + + Keyword Arguments: + exchange_type -- one of one of 'direct', 'topic', 'fanout', 'headers' + (default 'direct') + routing_key -- the routing keys to the endpoints for this producer + (default []) + """ + self._url = rabbitmq_url + self._exchange = exchange + self._exchange_type = exchange_type + self._routing_keys = routing_keys + + self._connection = None # RabbitMQ connection object + self._channel = None # RabbitMQ channel object + + import random + self._name = random.randint(0,100) + + def __call__(self, data): + """ + Publish data to the RabbitMQ server. + + Arguments: + data -- JSON serializable data to send + """ + if self._connection is None: # Start the connection if it is inactive + self.start() + else: # Serialize and send the data + message = self.pack_data(data) + self.send_data(message) + + def add_routing_key(self, key): + """ + Add a new endpoint that will receive this data. + + Arguments: + key -- the routing key for the new endpoint + """ + if key not in self._routing_keys: + #print("Adding key %s to %s" % (key, self._name)) + self._routing_keys.append(key) + #print(self._routing_keys) + + def remove_routing_key(self, key): + """ + Stop sending data to an existing endpoint. + + Arguments: + key -- the routing key for the existing endpoint + """ + try: + self._routing_keys.remove(key) + except ValueError: + pass + + def pack_data(self, data): + """ + JSON-serialize the data for transport. + + Arguments: + data -- JSON-serializable data + """ + try: # Generate a JSON string from the data + msg = json.dumps(data) + except TypeError as e: # Generate and return an error if serialization fails + msg = json.dumps({"err": str(e)}) + finally: + return msg + + def send_data(self, data): + """ + Send the data to all active endpoints. + + Arguments: + data -- the message to send + """ + if self._channel is not None: # Make sure the connection is active + for key in self._routing_keys: # Send to all endpoints + #print(self._exchange, key, self._name) + self._channel.basic_publish(exchange = self._exchange, + routing_key=key, + body=data) + + def start(self): + """ + Open a connection if one does not exist. + """ + print("Starting new connection") + if self._connection is None: + print("Creating connection object") + self._connection = pika.BlockingConnection(pika.URLParameters(self._url)) + self._channel = self._connection.channel() + self._channel.exchange_declare(exchange=self._exchange, + type=self._exchange_type) + + def shutdown(self): + """ + Close an existing connection. + """ + if self._channel is not None: + self._channel.close() + + def _on_connection_open(self, unused_connection): + """ + Create a new channel if the connection opens successful. + + Arguments: + unused_connection -- a reference to self._connection + """ + print("Connection is open") + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_connection_close(self, connection, code, text): + """ + Actions to take when the connection is closed for any reason. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + print("Connection is closed") + self._channel = None + self._connection = None + + def _on_channel_open(self, channel): + """ + Actions to take when the channel opens. + + Arguments: + channel -- the newly opened channel + """ + print("Channel is open") + self._channel = channel + self._channel.add_on_close_callback(self._on_channel_close) + self._declare_exchange() + + def _on_channel_close(self, channel, code, text): + """ + Actions to take when the channel closes for any reason. + + Arguments: + channel -- the channel that was closed (same as self._channel) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + print("Channel is closed") + self._connection.close() + + def _declare_exchange(self): + """ + Set up the exchange to publish to even if it already exists. + """ + print("Exchange is declared") + self._channel.exchange_declare(exchange=self._exchange, + type=self.exchange_type) + +if __name__ == "__main__": + import time + + config = { + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "routing_key": "test_consumer", + "exchange_type": "topic" + } + + producer = PikaProducer(config["url"], + config["exchange"], + exchange_type=config["exchange_type"], + routing_keys=[config["routing_key"]]) + producer.start() + + while True: + try: + time.sleep(5) + data = str(time.time()) + ": Hello SimStream" + producer.send_data(data) + except KeyboardInterrupt: + producer.shutdown() http://git-wip-us.apache.org/repos/asf/airavata/blob/24ced800/modules/simstream/simstream/simstream.py ---------------------------------------------------------------------- diff --git a/modules/simstream/simstream/simstream.py b/modules/simstream/simstream/simstream.py new file mode 100755 index 0000000..499a8c3 --- /dev/null +++ b/modules/simstream/simstream/simstream.py @@ -0,0 +1,167 @@ +import pika + +from .pikaasyncconsumer import PikaAsyncConsumer +from .datacollector import DataCollector +from .datareporter import DataReporter +from .eventhandler import EventHandler +from .eventmonitor import EventMonitor + + +class ReporterExistsException(Exception): + """Thrown when attempting to add a DataReporter with a conflicting name""" + pass + + +class SimStream(object): + """ + Manager for routing messages to their correct reporter. + """ + + DEFAULT_CONFIG_PATH="simstream.cnf" + + + class MessageParser(object): + """ + Internal message parsing facilities. + """ + + def __init__(self): + self.parsed = None + + def __call__(self, message): + pass + + + def __init__(self, reporters={}, config={}): + self.reporters = reporters + self.consumer = None + self.config = config + + def add_data_reporter(self, reporter): + """ + Add a new DataReporter object. + + Arguments: + reporter -- the DataReporter to add + """ + if reporter.name in self.reporters: + raise ReporterExistsException + self.reporters[reporter.name] = reporter + + def parse_config(self): + """ + Read the config file and set up the specified, data collection and + event handling resources. + """ + # TODO: Read in config + # TODO: Set up configuration dict + pass + + def route_message(self, message): + """ + Send a message to the correct reporter. + """ + # TODO: Create new MessageParser + # TODO: Run message through MessageParser + # TODO: Route message to the correct DataReporter/EventMonitor + parser = MessageParser() + parser(message) + if parser.reporter_name in self.reporters: + self.reporters[parser.reporter_name].start_streaming( + parser.collector_name, + parser.routing_key + ) + + def start_collecting(self): + """ + Begin collecting data and monitoring for events. + """ + for reporter in self.reporters: + self.reporters[reporter].start_collecting() + + def setup(self): + """ + Set up the SimStream instance: create DataCollectors, create + EventMonitors, configure AMQP consumer. + """ + self.parse_config() + #self.setup_consumer() + self.setup_data_collection() + self.setup_event_monitoring() + + def setup_data_collection(self): + """ + Set up all DataReporters and DataCollectors. + """ + # TODO: Create and configure all DataReporters + # TODO: Create and configure all DataCollectors + # TODO: Assign each DataCollector to the correct DataReporter + if "reporters" in self.config: + for reporter in self.config.reporters: + pass + for collector in self.config.collectors: + pass + + def setup_event_monitoring(self): + #TODO: Create and configure all EventMonitors + #TODO: Create and configure all EventHandlers + #TODO: Assign each EventHandler to the correct EventMonitor + #TODO: Assign each EventMonitor to the correct DataCollector + pass + + def setup_consumer(self): + """ + Set up and configure the consumer. + """ + if len(self.config) > 0 and self.consumer is None: + if "message_handler" in self.config: + message_handler = self.config["message_handler"] + else: + message_handler = self.route_message + self.consumer = PikaAsyncConsumer(self.config["url"], + self.config["exchange"], + self.config["queue"], + message_handler, + exchange_type=self.config["exchange_type"], + routing_key=self.config["routing_key"] + ) + + def start(self): + """ + Configure and start SimStream. + """ + if self.consumer is None: + self.setup() + self.start_collecting() + #self.consumer.start() + + def stop(self): + """ + Stop all data collection, event monitoring, and message consumption. + """ + self.consumer.stop() + self.stop_collecting() + + +if __name__ == "__main__": + def print_message(message): + with open("test.out", "w") as f: + print(message) + + print(SimStream.DEFAULT_CONFIG_PATH) + + config = { + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "queue": "simstream_test", + "message_handler": print_message, + "routing_key": "test_consumer", + "exchange_type": "topic" + } + + streamer = SimStream(config=config) + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop()