atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject [08/25] incubator-atlas git commit: ATLAS-1898: initial commit of ODF
Date Wed, 28 Jun 2017 05:57:21 GMT
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md b/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
new file mode 100755
index 0000000..45f272b
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/spark-discovery-service-tutorial.md
@@ -0,0 +1,206 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Tutorial: Creating Spark discovery services
+
+This tutorial shows how to turn an existing [Apache Spark][1] application into an ODF discovery service of type *Spark*. The tutorial is based the Spark *summary statistics* example application provided with ODF in project `odf-spark-example-application`. It implements the Spark `describe()` method of the Spark [DataSet][2] class that calculates basic summary statistics on a Spark data frame.
+
+## Introduction
+
+ODF supports Spark applications implemented in Java or Scala. In order to be used as ODF discovery services, a Spark application must implement one of the following two interfaces:
+
+* **DataFrame** - intended for Spark applications that process relational tables by using Spark data frames internally.
+* **Generic** - intended for applications that need the full flexibility of ODF.
+
+Both interfaces requires a specific method (or multiple methods) to be implemented by the Spark application that is called by ODF to run the discovery service. This method takes the current Spark context and the data set to be processed as input parameters and returns the annotations to be created. The two interface types are described in detail in separate sections below.
+
+Spark discovery services must be packaged into a single application jar file that contains all required dependencies. Spark libraries, drivers for data access, and the required ODF jar files are implicitly provided by ODF and do not need to be packaged into the application jar file. The jar file may be renamed into *zip* by replacing its extension (not by zipping the jar file) in order to avoid possible security issues when making the file available trough tools like [box](https://box.com).
+
+### Configure an ODF Spark cluster
+
+ODF supports access to a local spark cluster which can be can be configured in the `sparkConfig` section of the ODF settings using the ODF REST API or the ODF web application. The parameter `clusterMasterUrl` must point to the master URL of your Spark cluster, e.g. `spark://dyn-9-152-202-64.boeblingen.de.ibm.com:7077`. An optional set of [Spark configuration options](http://spark.apache.org/docs/latest/configuration.html) can be set in the `configs` parameter by providing appropriate name value pairs. The ODF test environment comes with a ready-to-use local Spark cluser running on your local system. It can monitored on the URL `http://localhost:8080/`.
+
+### Registering a Spark service
+
+A Spark discovery service can be registered using the *Services* tab of the admin Web application or the `/services` endpoint of the [ODF REST API](../swagger/ext-services.html), the following parameters need to be specified to register a service. You may use the following example values to register your own instance of the *summary statistics* discovery service:
+
+* Name of the discovery service: `Spark Summary Statistics`
+* Description: `Calculates summary statistics for a given table or data file.`
+* Unique service ID: `spark-summary-statistics`
+* URL of application jar file (may be renamed to zip): `file:///tmp/odf-spark/odf-spark-example-application-1.2.0-SNAPSHOT.jar` (Update link to point to correct location of the file)
+* Name of entry point to be called: `org.apache.atlas.odf.core.spark.SummaryStatistics`
+* Service interface type: `DataFrame`
+
+For trying out the *generic* interface, entry point `org.apache.atlas.odf.spark.SparkDiscoveryServiceExample` and service interface type `Generic` may be specified.   
+
+### Testing the Spark service
+
+In order to test the Spark service, you can use the *DataSets* tab of the ODF admin Web application. Click on *START ANALYSIS* right to a relational data set (data file or relational table), then select the newly registered Spark discovery service and click *SUBMIT*. You can browse the resulting annotations by searching for the name of the annotation type in the Atlas metadata repository. The example services creates two types of annotations, *SummaryStatisticsAnnotation* and *SparkTableAnnotation*. *SummaryStatisticsAnnotation* annotates data set columns with the five attributes `count`, `mean`, `stddev`, `min`, and `max`, that represent basic statistics of the data set. *SparkTableAnnotation* annotates the data set with a single attribute `count` that represents the number of columns of the data set.
+
+### Developing Spark discovery services
+
+When developing a new discovery service, you may use project `odf-spark-example-application` as a template. Rather than testing your service interactively using the ODF admin web application it is recommended to create a new test case in class `SparkDiscoveryServiceTest` of project `odf-core`. Two methods need to be added, one for describing the service, the other for running the actual test.
+
+The method that describes the service basically contains the same parameters that need to be specified when adding a service through the admin webapp. The jar file must be an URL that pay point to a local file:  
+
+	public static DiscoveryServiceRegistrationInfo getSparkSummaryStatisticsService() {
+		DiscoveryServiceRegistrationInfo regInfo = new DiscoveryServiceRegistrationInfo();
+		regInfo.setId("spark-summary-statistics-example-service");
+		regInfo.setName("Spark summary statistics service");
+		regInfo.setDescription("Example discovery service calling summary statistics Spark application");
+		regInfo.setIconUrl("spark.png");
+		regInfo.setLink("http://www.spark.apache.org");
+		regInfo.setParallelismCount(2);
+		DiscoveryServiceSparkEndpoint endpoint = new DiscoveryServiceSparkEndpoint();
+		endpoint.setJar("file:/tmp/odf-spark-example-application-1.2.0-SNAPSHOT.jar");
+		endpoint.setClassName("org.apache.atlas.odf.core.spark.SummaryStatistics");
+		endpoint.setInputMethod(SERVICE_INTERFACE_TYPE.DataFrame);
+		regInfo.setEndpoint(endpoint);
+		return regInfo;
+	}
+
+The method that runs the actual test retrieves the service description from the above method and specifies what type of data set should be used for testing (data file vs. relational table) and what types of annotations are created by the discovery service. The test automatically applies the required configurations, runs the service, and checks whether new annotations of the respective types have been created. In order to speed up processing, the existing test can be temporarily commented out.  
+
+	@Test
+	public void testLocalSparkClusterWithLocalDataFile() throws Exception{
+		runSparkServiceTest(
+			getLocalSparkConfig(),
+			DATASET_TYPE.DataFile,
+			getSparkSummaryStatisticsService(),
+			new String[] { "SparkSummaryStatisticsAnnotation", "SparkTableAnnotation" }
+		);
+	}
+
+For compiling the test case, the `odf-core` project needs to be built:
+
+	cd ~/git/shared-discovery-platform/odf-core
+	mvn clean install -DskipTests
+
+The test is started implicitly when building the  `odf-spark` project.
+
+	cd ~/git/shared-discovery-platform/odf-spark
+	mvn clean install
+
+If something goes wrong, debugging information will be printed to stdout during the test. For speeding up the build and test process, option `-Duse.running.atlas` may be added to the two `mvn` commands. This way, a running Atlas instance will be used instead of starting a new instance every time.
+
+#### Test run method example
+
+### Troubleshooting
+
+Before registering a Spark application in ODF as a new discovery service, it is highly recommended to test the application interactively using the `spark-submit` tool and to check whether the application implements the requested interfaces and produces the expected output format. If the execution of a Spark discovery service fails, you can browse the ODF log for additional information.
+
+## DataFrame interface
+
+The ODF *DataFrame* interface for Spark discovery services has a number of advantages that makes it easy to turn an existing Spark application into an ODF discovery service:
+
+* No dependencies to the ODF code, except that a specific method needs to be implemented.
+* No need to care about data access because the data set to be analyzed is provided as Spark data frame.
+* Easy creation of annotations by returning "annotation data frames".   
+
+The simplicity of the DataFrame interface leads to a number of restrictions:
+
+* Only relational data sets can be processed, i.e. data files (OMDataFile) and relational tables (OMTable).
+* Annotations may only consist of a flat list of attributes that represent simple data types, i.e. data structures and references to other data sets are not supported.  
+* Annotations may only be attached to the analyzed relational data set as well as to its columns.
+
+### Method to be implemented
+
+In order to implement the DataFrame interface, the Spark application must implement the following method:
+
+	public static Map<String,Dataset<Row>> processDataFrame(JavaSparkContext sc, DataFrame df, String[] args)
+
+The parameters to be provided to the Spark application are:
+
+* **sc**: The Spark context to be used by the Spark application for performing all Spark operations.
+* **df**: The data set to be analyzed represented by a Spark data frame.
+* **args**: Optional arguments for future use.
+
+### Expected output
+
+The result to be provided by the Spark application must be of type `Map<String,Dataset<Row>>` where `String` represents the type of the annotation to be created and `Dataset<Row>` represents the *annotation data frame* that defines the annotations to be created. If the annotation type does not yet exist, a new annotation type will be dynamically created based on the attributes of the annotation data frame.
+
+The following example describes the format of the annotation data frame. The example uses the BankClientsShort data file provided with ODF. In contains 16 columns with numeric values that represent characteristics of bank clients:
+
+CUST_ID | ACQUIRED | FIRST_PURCHASE_VALUE | CUST_VALUE_SCORE | DURATION_OF_ACQUIRED | CENSOR | ACQ_EXPENSE | ACQ_EXPENSE_SQ | IN_B2B_INDUSTRY | ANNUAL_REVENUE_MIL | TOTAL_EMPLOYEES | RETAIN_EXPENSE | RETAIN_EXPENSE_SQ CROSSBUY | PURCHASE_FREQ | PURCHASE_FREQ_SQ
+---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---
+481 | 0 | 0.0 | 0.0000 | 0 | 0 | 382.32 | 146168.58 | 0 | 56.51 | 264 | 0.00 | 0.0 | 0 | 0 | 0
+482 | 1 | 249.51 | 59.248 | 730 | 1 | 586.61 | 344111.29 | 1 | 35.66 | 355 | 1508.16 | 2274546.59 | 2 | 3 | 9
+483 | 0 | 0.0 | 0.0000 | 0 | 0 | 444.61 | 197678.05 | 1 | 40.42 | 452 | 0.00 | 0.0 | 0 | 0 | 0
+484 | 1 | 351.41 | 77.629 | 730 | 1 | 523.10 | 273633.61 | 1 | 56.36 | 320 | 2526.72 | 6384313.96 | 3 | 12 | 144
+485 | 1 | 460.04 | 76.718 | 730 | 1 | 357.78 | 128006.53 | 1 | 23.53 | 1027 | 2712.48 | 7357547.75 | 2 | 13 | 169
+486 | 1 | 648.6 | 0.0000 | 701 | 0 | 719.61 | 517838.55 | 0 | 59.97 | 1731 | 1460.64 | 2133469.21 | 5 | 11 | 121
+487 | 1 | 352.84 | 63.370 | 730 | 1 | 593.44 | 352171.03 | 1 | 45.08 | 379 | 1324.62 | 1754618.14 | 4 | 8 | 64
+488 | 1 | 193.18 | 0.0000 | 289 | 0 | 840.30 | 706104.09 | 0 | 35.95 | 337 | 1683.83 | 2835283.47 | 6 | 12 | 144
+489 | 1 | 385.14 | 0.0000 | 315 | 0 | 753.13 | 567204.80 | 0 | 58.85 | 745 | 1214.99 | 1476200.7 | 1 | 12 | 144
+
+When applying the *Spark Summary Statistics* service to the table, two annotation data frames will be returned by the service, one for the *SparkSummaryStatistics* and one for the *SparkTableAnnotation* annotation type. The data frame returned for the *SparkSummaryStatistics* annotation type consists of one column for each attribute of the annotation. In the example, the attributes are `count`, `mean`, `stddev`, `min`, and `max` standing for the the column count, the mean value, the standard deviation, the minimum and the maximum value of each column. Each row represents one annotation to be created. The first column `ODF_ANNOTATED_COLUMN` stands for the column of the input data frame to which the annotation should be assigned.
+
+ODF_ANNOTATED_COLUMN    |count   |                mean |              stddev |       min |       max
+------------------------|--------|---------------------|---------------------|-----------|----------
+              CLIENT_ID |  499.0 |   1764.374749498998 |  108.14436025195488 |    1578.0 |    1951.0
+                    AGE |  499.0 |   54.65130260521042 |  19.924220223453258 |      17.0 |      91.0
+          NBR_YEARS_CLI |  499.0 |  16.847695390781563 |  10.279080097460023 |       0.0 |      48.0
+        AVERAGE_BALANCE |  499.0 |   17267.25809619238 |   30099.68272689043 |  -77716.0 |  294296.0
+             ACCOUNT_ID |  499.0 |   126814.4749498998 |  43373.557241804665 |  101578.0 |  201950.0
+
+If there is no (first) column named `ODF_ANNOTATED_COLUMN`, the annotations will be assigned to the data set rather than to its columns. The following example annotation data frame of type *SparkTableAnnotation* assigns a single attribute `count` to the data set:
+
+| count |
+|-------|
+| 499   |
+
+### Example implementation
+
+The implementation of the The *summary statistics*  discovery service may be used as a reference implementation for the DataFrame interface. It is available in class `SummaryStatistics` of project `odf-spark-example-application`.
+
+## Generic interface
+
+The *generic* interface provides the full flexibility of ODF discovery services implemented in Java (or Scala):
+
+* No restrictions regarding the types of data sets to be analyzed.
+* Arbitrary objects may be annotated because references to arbitrary objects may be retrieved from the meta data catalog.
+* Annotations may contain nested structures of data types and references to arbitrary objects.
+
+On the downside, the generic interface may be slightly more difficult to use than the DataFrame interface:
+
+* Discovery service must implement a specific ODF interface.
+* Spark RDDs, data frames etc. must be explicitly constructed (Helper methods are available in class `SparkUtils`).
+* Resulting annotations must be explicitly constructed and linked to the annotated objects.
+
+### Methods to be implemented
+
+The Spark application must implement the `SparkDiscoveryService` interface available in ODF project `odf-core-api`:
+
+	public class SparkDiscoveryServiceExample extends SparkDiscoveryServiceBase implements SparkDiscoveryService
+
+The interface consists of the following two methods that are described in detail in the [Java Docs for ODF services](./apidocs/index.html). The `SparkDiscoveryServiceBase` can be extended for convenience as the `SparkDiscoveryService` interface has much more methods.
+
+#### Actual discovery service logic
+
+This method is called to run the actual discovery service.
+
+	DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request)
+
+#### Validation whether data set can be accessed
+
+This method is called internally before running the actual discovery service.
+
+	DataSetCheckResult checkDataSet(DataSetContainer dataSetContainer)
+
+### Example implementation
+
+Class class `SparkDiscoveryServiceExample` in project `odf-spark-example-application` provides an example implementation of a *generic* discovery service. It provides an alternative implementation of the *summary statistics*  discovery service.
+
+  [1]: http://spark.apache.org/
+  [2]: http://spark.apache.org/docs/latest/api/java/index.html

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/test-env.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/test-env.md b/odf/odf-doc/src/site/markdown/test-env.md
new file mode 100755
index 0000000..a0697bf
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/test-env.md
@@ -0,0 +1,84 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Test environment
+
+The odf-test-env archive contains a simple test environment for ODF.
+It contains all components to run a simple ODF installation, namely
+
+- Apache Kafka
+- Apache Atlas
+- Jetty (to host the ODF web app)
+- Apache Spark
+
+The test environment is available on Linux and Windows.
+
+## Before you start
+
+Make sure that
+
+- The python executable of Python 2.7 is in your path
+- The environment variable JAVA_HOME is set and points to a proper JDK (not just a JRE!)
+
+
+## *Fast path*: Download and install ODF test environment
+
+If you are running on Linux you can download and install the latest ODF test environment by
+downloading the script `download-install-odf-testenv.sh` from
+<a href="https://shared-discovery-platform-jenkins.swg-devops.com:8443/view/1-ODF/job/Open-Discovery-Framework/lastSuccessfulBuild/artifact/odf-test-env/src/main/scripts/download-install-odf-testenv.sh">
+here</a>.
+
+If you call the script with no parameters, it will download, install and start the latest version of the test env.
+The default unpack directory is `~/odf-test-env`.
+
+## Download the test environment manually
+
+You can get the latest version of the test environment from the Jenkins
+<a href="https://shared-discovery-platform-jenkins.swg-devops.com:8443/view/1-ODF/job/Open-Discovery-Framework/lastSuccessfulBuild/artifact/odf-test-env/target/odf-test-env-0.1.0-SNAPSHOT-bin.zip">
+here</a>.
+
+## Running the test environment
+
+To start the test environment on Linux, run the script ``odftestenv.sh start`` . The script will start four background processes (Zookeeper, Kafka, Atlas, Jetty). To stop the test env, use the script ``odftestenv.sh stop``.
+
+To start the test environment on Windows, run the script ``start-odf-testenv.bat``.
+This will open four command windows (Zookeeper, Kafka, Atlas, Jetty) with respective window titles. To stop the test environment close all these windows. Note that the `HADOOP_HOME` environment variable needs to be set on Windows as described in the [build documentation](build.md).
+
+
+Once the servers are up and running you will reach the ODF console at
+[https://localhost:58081/odf-web-0.1.0-SNAPSHOT](https://localhost:58081/odf-web-0.1.0-SNAPSHOT).
+
+*Note*: The test environment scripts clean the Zookeeper and Kafka data before it starts.
+This means in particular that the configuration will be reset every time you restart it!
+
+Have fun!
+
+## Restart / cleanup
+
+On Linux, the `odftestenv.sh` script has these additional options
+
+- `cleanconfig`: Restart the test env with a clean configuration and clean Kafka topics
+- `cleanmetadata`: Restart with empty metadata
+- `cleanall`: Both `cleanconfig`and `cleanmetadata`.
+
+
+## Additional Information
+
+### Deploying a new version of the ODF war
+Once started you can hot-deploy a new version of the ODF war file simply by copying it
+to the ``odfjettybase/webapps`` folder even while the test environment's Jetty instance is running.
+Note that it may take a couple of seconds before the new app is available.
+
+If you have the ODF build set up you may want to use the ``deploy-odf-war.bat/.sh`` for this.
+You must edit the environment variable ``ODF_GIT_DIR`` in this script first to point to your local build directory.

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/markdown/troubleshooting.md
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/markdown/troubleshooting.md b/odf/odf-doc/src/site/markdown/troubleshooting.md
new file mode 100755
index 0000000..971bfc6
--- /dev/null
+++ b/odf/odf-doc/src/site/markdown/troubleshooting.md
@@ -0,0 +1,123 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# Troubleshooting
+
+## ODF
+
+### Debugging using eclipse
+
+You can run Jetty inside Eclipse using the “Eclipse Jetty Feature” (Eclipse -> Help -> Install New Software…).
+Then, create a new debug configuration (Run -> Debug Configurations…). Specify
+
+WebApp Tab
+Project: odf-web
+WebApp Folder: ../../../../../odf-web/src/main/webapp
+Context Path: /odf-web-0.1.0-SNAPSHOT
+HTTP / HTTPs Port: 58081
+
+Arguments Tab
+VM Arguments: -Dodf.zookeeper.connect=localhost:52181
+
+As the Eclipse Jetty plugin does not support secure connections nor basic authentication, remove the `<security-constraint>`
+and `<login-config>`
+sections from the web.xml.
+The URL of the ODF Webapp the needs to be prefixed with http:// rather than https://.
+
+Then start Atlas and Kafka via the test-env (just comment out the line that starts jetty or stop it after being started).
+Now you can use the debug configuration in eclipse to start ODF.
+
+See also (https://ibm-analytics.slack.com/archives/shared-discovery-pltf/p1467365155000009)
+
+
+### Logs and trace
+ODF uses ``java.util.logging`` APIs so if your runtime environment does support
+direct setting, use the respective mechanism.
+
+For runtimes that don't support this out-of-the-box (like Jetty) you can set the JVM system property
+``odf.logspec`` with a value like ``<Level>,<Path>`` which advises ODF to
+write the log with logging level ``<Level>`` to the file under ``<Path>``.
+
+Example:
+
+	-Dodf.logspec=ALL,/tmp/myodflogfile.log
+
+Availabel log levels are the ones for java.util.logging, namely SEVERE, WARNING, INFO, FINE, FINER, FINEST,
+and ALL.
+
+
+## Atlas
+
+### Logs
+
+The logs directory contains a bunch of logfiles, together with a file called ``atlas.pid`` which
+contains the process ID of the Atlas server that is currently running.
+In case of issues the file ``logs/application.log`` should be checked first.
+
+### Restarting Atlas
+
+Run these commands (from the atlas installation directory) to restart Atlas
+
+	bin/atlas_stop.py
+	bin/atlas_start.py
+
+### Clean all data
+
+To clean the Atlas repository, simply remove the directories ``data`` and ``logs`` before starting.
+
+
+### Issues
+
+#### Service unavailable (Error 503)
+
+Sometimes, calling any Atlas REST API (and the UI) doesn't work and an HTTP error 503 is returned.
+We see this error occasionally and don't know any way to fix it except cleaning all data and restarting Atlas
+
+
+### Creating Atlas object take a long time
+
+It takes a long time to create an Atlas object and after about a minute you see a message like this in the log
+
+	Unable to update metadata after 60000ms
+
+This is the result of the kafka queues (which are used for notifications) being in error.
+To fix this restart Atlas (no data cleaning required).
+
+## Kafka / Zookeeper
+
+If there is a problem starting Kafka / Zookeeper check if there might be a port conflict due to other instances of Kafka / Zookeeper using the default port.
+This might be the case if a more recent version of the IS suite is installed on the system on which you want to run ODF.
+
+Example: If another instance of Zookeeper uses the default port 52181 you need to switch the Zookeeper port used by replacing 52181 with a free port number in:
+- start-odf-testenv.bat
+- kafka_2.10-0.8.2.1\config\zookeeper.properties
+- kafka_2.10-0.8.2.1\config\server.properties
+
+### Reset
+
+To reset your Zookeeper / Kafka installation, you will first have to stop the servers:
+
+	bin/kafka-server-stop
+	bin/zookeeper-server-stop
+
+Next remove the zookeeper data directory and the Kafka logs directory. Note that "logs"
+in Kafka mean the actual data in the topics not the logfiles.
+You can find which directories to clean in the the properties ``dataDir`` in the ``zookeeper.properties``
+file and ``log.dirs`` in ``server.properties`` respectively.
+The defaults are ``/tmp/zookeeper`` and ``/tmp/kafka-logs``.
+
+Restart the servers with
+
+	bin/zookeeper-server-start config/zookeeper.properties
+	bin/kafka-server-start config/server.properties

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
new file mode 100755
index 0000000..e6ffb46
--- /dev/null
+++ b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>odf.tutorials</groupId>
+	<artifactId>odf-tutorial-discoveryservice</artifactId>
+	<version>1.2.0-SNAPSHOT</version>
+	<packaging>jar</packaging>
+
+	<name>odf-tutorial-discoveryservice</name>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-api</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
new file mode 100755
index 0000000..2899a53
--- /dev/null
+++ b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialAnnotation.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import org.apache.atlas.odf.core.metadata.Annotation;
+
+/*
+ * An example annotation that adds one property to the default annotation class
+ */
+public class ODFTutorialAnnotation extends Annotation {
+
+	private String tutorialProperty;
+
+	public String getTutorialProperty() {
+		return tutorialProperty;
+	}
+
+	public void setTutorialProperty(String tutorialProperty) {
+		this.tutorialProperty = tutorialProperty;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
new file mode 100755
index 0000000..16848ec
--- /dev/null
+++ b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import java.util.Collections;
+import java.util.Date;
+
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceRequest;
+import org.apache.atlas.odf.api.discoveryservice.DiscoveryServiceResponse.ResponseCode;
+import org.apache.atlas.odf.api.discoveryservice.SyncDiscoveryServiceBase;
+import org.apache.atlas.odf.api.discoveryservice.sync.DiscoveryServiceSyncResponse;
+
+/**
+ * A simple synchronous discovery service that creates one annotation for the data set it analyzes.
+ *
+ */
+public class ODFTutorialDiscoveryService extends SyncDiscoveryServiceBase {
+
+	@Override
+	public DiscoveryServiceSyncResponse runAnalysis(DiscoveryServiceRequest request) {
+		// 1. create an annotation that annotates the data set object passed in the request
+		ODFTutorialAnnotation annotation = new ODFTutorialAnnotation();
+		annotation.setAnnotatedObject(request.getDataSetContainer().getDataSet().getReference());
+		// set a new property called "tutorialProperty" to some string
+		annotation.setTutorialProperty("Tutorial annotation was created on " + new Date());
+
+		// 2. create a response with our annotation created above
+		return createSyncResponse( //
+				ResponseCode.OK, // Everything works OK 
+				"Everything worked", // human-readable message
+				Collections.singletonList(annotation) // new annotations
+		);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
new file mode 100755
index 0000000..2709548
--- /dev/null
+++ b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/main/resources/META-INF/odf/odf-services.json
@@ -0,0 +1,13 @@
+[
+  {
+	"id": "odftutorial.discoveryservicetutorial.ODFTutorialDiscoveryService",
+	"name": "First tutorial service",
+	"description": "The first tutorial service that is synchronous and creates just a single annotation for a data set.",
+	"deletable": true,
+	"endpoint": {
+	  "runtimeName": "Java",
+	  "className": "odftutorial.discoveryservicetutorial.ODFTutorialDiscoveryService"
+	},
+	"iconUrl": "https://www-03.ibm.com/ibm/history/exhibits/logo/images/920911.jpg"
+  }
+]

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
new file mode 100755
index 0000000..1eab53f
--- /dev/null
+++ b/odf/odf-doc/src/site/resources/tutorial-projects/odf-tutorial-discoveryservice/src/test/java/odftutorial/discoveryservicetutorial/ODFTutorialDiscoveryServiceTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package odftutorial.discoveryservicetutorial;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for discovery service
+ */
+public class ODFTutorialDiscoveryServiceTest {
+	
+	@Test
+	public void test() throws Exception {
+		Assert.assertTrue(true);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-doc/src/site/site.xml
----------------------------------------------------------------------
diff --git a/odf/odf-doc/src/site/site.xml b/odf/odf-doc/src/site/site.xml
new file mode 100755
index 0000000..c810e66
--- /dev/null
+++ b/odf/odf-doc/src/site/site.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project name="Open Discovery Framework">
+	<skin>
+		<groupId>org.apache.maven.skins</groupId>
+		<artifactId>maven-fluido-skin</artifactId>
+		<version>1.4</version>
+	</skin>
+	<bannerLeft>
+		<name>Open Discovery Framework</name>
+	</bannerLeft>
+	<custom>
+		<fluidoSkin>
+			<topBarEnabled>false</topBarEnabled>
+			<sideBarEnabled>true</sideBarEnabled>
+		</fluidoSkin>
+	</custom>
+	<body>
+		<links>
+			<item name="Apache Atlas" href="http://atlas.incubator.apache.org" />
+			<item name="Apache Kafka" href="http://kafka.apache.org" />
+		</links>
+		<menu name="Getting Started">
+			<item name="Overview" href="index.html" />
+			<item name="First Steps" href="first-steps.html" />
+			<item name="Build" href="build.html" />
+			<item name="Test Environment" href="test-env.html" />
+		</menu>
+		<menu name="Tutorials">
+			<item name="Install ODF and its prerequisites manually" href="install.html"/>
+			<item name="Run your first ODF analysis" href="first-analysis-tutorial.html"/>
+			<item name="Build and run your first Discovery Service" href="discovery-service-tutorial.html"/>
+			<item name="Creating Spark discovery services" href="spark-discovery-service-tutorial.html"/>
+		</menu>
+		<menu name="Reference">
+			<item name="ODF Metadata API" href="odf-metadata-api.html" />
+			<item name="API reference" href="api-reference.html" />
+			<item name="Troubleshooting" href="troubleshooting.html" />
+		</menu>
+		<menu name="Customization">
+			<item name="Discovery Services" href="discovery-services.html" />
+			<item name="Data Model" href="data-model.html" />
+		</menu>
+		<menu name="Internal">
+			<item name="Jenkins build" href="jenkins-build.html" />
+		</menu>
+		<footer>All rights reserved.</footer>
+	</body>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/.gitignore
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/.gitignore b/odf/odf-messaging/.gitignore
new file mode 100755
index 0000000..94858e5
--- /dev/null
+++ b/odf/odf-messaging/.gitignore
@@ -0,0 +1,19 @@
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+.settings
+target
+.classpath
+.project
+.factorypath
+derby.log

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/pom.xml
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/pom.xml b/odf/odf-messaging/pom.xml
new file mode 100755
index 0000000..95f9d44
--- /dev/null
+++ b/odf/odf-messaging/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~
+~ Licensed under the Apache License, Version 2.0 (the "License");
+~ you may not use this file except in compliance with the License.
+~ You may obtain a copy of the License at
+~
+~   http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<artifactId>odf-messaging</artifactId>
+	<name>odf-messaging</name>
+
+	<parent>
+		<groupId>org.apache.atlas.odf</groupId>
+		<artifactId>odf</artifactId>
+		<version>1.2.0-SNAPSHOT</version>
+	</parent>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-api</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<version>0.10.0.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka_2.11</artifactId>
+			<version>0.10.0.0</version>
+			<scope>compile</scope>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.12</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.atlas.odf</groupId>
+			<artifactId>odf-core</artifactId>
+			<version>1.2.0-SNAPSHOT</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.derby</groupId>
+			<artifactId>derby</artifactId>
+			<version>10.12.1.1</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.6</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+						<configuration>
+						<!-- remove implementations properties file for test jar -->
+							<excludes>
+								<exclude>org/apache/atlas/odf/odf-implementation.properties</exclude>
+							</excludes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+	<profiles>
+		<profile>
+			<id>all-unit-tests</id>
+			<activation>
+				<activeByDefault>true</activeByDefault>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-surefire-plugin</artifactId>
+						<version>2.19</version>
+						<configuration>
+							<systemPropertyVariables>
+								<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+								<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+								<odf.build.project.name>${project.name}</odf.build.project.name>
+
+								<!-- additional properties for the test services -->
+						 		<asynctestservice.testparam>sometestvalueforasync</asynctestservice.testparam>
+								<synctestservice.testparam>sometestvalueforsync</synctestservice.testparam>
+							</systemPropertyVariables>
+							<dependenciesToScan>
+								<dependency>org.apache.atlas.odf:odf-core</dependency>
+							</dependenciesToScan>
+							<!--
+							<includes><include>**ShutdownTest**</include></includes>
+				 -->
+
+							<excludes>
+								<exclude>**/integrationtest/**</exclude>
+								<exclude>**/configuration/**</exclude>
+							</excludes>
+						</configuration>
+					</plugin>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-failsafe-plugin</artifactId>
+						<version>2.19</version>
+						<configuration>
+							<systemPropertyVariables>
+								<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+								<odf.logspec>${odf.integrationtest.logspec}</odf.logspec>
+							</systemPropertyVariables>
+							<dependenciesToScan>
+								<dependency>org.apache.atlas.odf:odf-core</dependency>
+							</dependenciesToScan>
+							<includes>
+								<include>**/integrationtest/**/**.java</include>
+							</includes>
+							<excludes>
+								<exclude>**/integrationtest/**/SparkDiscoveryService*</exclude>
+								<exclude>**/integrationtest/**/AnalysisManagerTest.java</exclude>
+							</excludes>
+						</configuration>
+						<executions>
+							<execution>
+								<id>integration-test</id>
+								<goals>
+									<goal>integration-test</goal>
+								</goals>
+							</execution>
+							<execution>
+								<id>verify</id>
+								<goals>
+									<goal>verify</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+		<profile>
+			<id>reduced-tests</id>
+			<activation>
+				<property>
+					<name>reduced-tests</name>
+					<value>true</value>
+				</property>
+			</activation>
+			<build>
+				<plugins>
+					<plugin>
+						<groupId>org.apache.maven.plugins</groupId>
+						<artifactId>maven-surefire-plugin</artifactId>
+						<version>2.19</version>
+						<configuration>
+							<systemPropertyVariables>
+								<odf.logspec>${odf.unittest.logspec}</odf.logspec>
+								<odf.zookeeper.connect>${testZookeepeConnectionString}</odf.zookeeper.connect>
+								<odf.build.project.name>${project.name}</odf.build.project.name>
+							</systemPropertyVariables>
+							<dependenciesToScan>
+								<dependency>corg.apache.atlas.odf:odf-core</dependency>
+							</dependenciesToScan>
+							<excludes>
+								<exclude>**/KafkaQueueManagerTest.java</exclude>
+								<exclude>**/ShutdownTest.java</exclude>
+								<exclude>**/MultiPartitionConsumerTest.java</exclude>
+								<exclude>**/integrationtest/**/SparkDiscoveryService*</exclude>
+								<exclude>**/integrationtest/**/AnalysisManagerTest.java</exclude>
+								<exclude>**/configuration/**</exclude>
+							</excludes>
+						</configuration>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
new file mode 100755
index 0000000..c9c95cc
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaMonitor.java
@@ -0,0 +1,545 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.atlas.odf.api.engine.BrokerNode;
+import org.apache.atlas.odf.api.engine.KafkaBrokerPartitionMessageCountInfo;
+import org.apache.atlas.odf.api.engine.KafkaPartitionInfo;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.wink.json4j.JSONException;
+
+import org.apache.atlas.odf.api.engine.PartitionOffsetInfo;
+import org.apache.atlas.odf.json.JSONUtils;
+
+import kafka.admin.AdminClient;
+import kafka.admin.AdminClient.ConsumerSummary;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.GroupCoordinatorRequest;
+import kafka.api.GroupCoordinatorResponse;
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.EndPoint;
+import kafka.common.ErrorMapping;
+import kafka.common.OffsetAndMetadata;
+import kafka.common.OffsetMetadata;
+import kafka.common.OffsetMetadataAndError;
+import kafka.common.TopicAndPartition;
+import kafka.coordinator.GroupOverview;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetCommitRequest;
+import kafka.javaapi.OffsetCommitResponse;
+import kafka.javaapi.OffsetFetchRequest;
+import kafka.javaapi.OffsetFetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import kafka.network.BlockingChannel;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+public class KafkaMonitor {
+	private final static String CLIENT_ID = "odfMonitorClient";
+
+	private Logger logger = Logger.getLogger(KafkaMonitor.class.getName());
+
+	//this only works for consumer groups managed by the kafka coordinator (unlike with kafka < 0.9 where consumers where managed by zookeeper)
+	public List<String> getConsumerGroups(String zookeeperHost, String topic) {
+		List<String> result = new ArrayList<String>();
+		try {
+			List<String> brokers = getBrokers(zookeeperHost);
+			StringBuilder brokersParam = new StringBuilder();
+			final Iterator<String> iterator = brokers.iterator();
+			while (iterator.hasNext()) {
+				brokersParam.append(iterator.next());
+				if (iterator.hasNext()) {
+					brokersParam.append(";");
+				}
+			}
+			Properties props = new Properties();
+			props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokersParam.toString());
+			final AdminClient client = AdminClient.create(props);
+			final Map<Node, scala.collection.immutable.List<GroupOverview>> javaMap = JavaConversions.mapAsJavaMap(client.listAllConsumerGroups());
+			for (Entry<Node, scala.collection.immutable.List<GroupOverview>> entry : javaMap.entrySet()) {
+				for (GroupOverview group : JavaConversions.seqAsJavaList(entry.getValue())) {
+					//Option<scala.collection.immutable.List<ConsumerSummary>> optConsumerSummary = client.describeConsumerGroup(group.groupId());
+					//if (optConsumerSummary.nonEmpty()) {
+						for (ConsumerSummary summary : JavaConversions.seqAsJavaList(client.describeConsumerGroup(group.groupId()) ) ) {
+							for (TopicPartition part : JavaConversions.seqAsJavaList(summary.assignment())) {
+								if (part.topic().equals(topic) && !result.contains(group.groupId())) {
+									result.add(group.groupId());
+								break;
+							}
+						}
+					}
+					//}
+				}
+			}
+		} catch (Exception ex) {
+			logger.log(Level.WARNING, "An error occured retrieving the consumer groups", ex.getCause());
+			ex.printStackTrace();
+		}
+		return result;
+	}
+
+	private ZkUtils getZkUtils(String zookeeperHost, ZkClient zkClient) {
+		return new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false);
+	}
+
+	private ZkClient getZkClient(String zookeeperHost) {
+		return new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+	}
+
+	public boolean setOffset(String zookeeperHost, String consumerGroup, String topic, int partition, long offset) {
+		logger.info("set offset for " + consumerGroup + " " + offset);
+		long now = System.currentTimeMillis();
+		Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
+		final TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+		offsets.put(topicAndPartition, new OffsetAndMetadata(new OffsetMetadata(offset, "Manually set offset"), now, -1));
+		int correlationId = 0;
+		OffsetCommitRequest req = new OffsetCommitRequest(consumerGroup, offsets, correlationId++, CLIENT_ID, (short) 1);
+		final BlockingChannel channel = getOffsetManagerChannel(zookeeperHost, consumerGroup);
+		channel.send(req.underlying());
+		OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().payload());
+		if (commitResponse.hasError()) {
+			logger.warning("Could not commit offset! " + topic + ":" + partition + "-" + offset + " error: " + commitResponse.errorCode(topicAndPartition));
+			channel.disconnect();
+			return false;
+		} else {
+			logger.info("offset commit successfully");
+			channel.disconnect();
+			return true;
+		}
+	}
+
+	public List<String> getBrokers(String zookeeperHost) {
+		List<String> result = new ArrayList<String>();
+		ZkClient zkClient = getZkClient(zookeeperHost);
+		List<Broker> brokerList = JavaConversions.seqAsJavaList(getZkUtils(zookeeperHost, zkClient).getAllBrokersInCluster());
+		Iterator<Broker> brokerIterator = brokerList.iterator();
+		while (brokerIterator.hasNext()) {
+			for (Entry<SecurityProtocol, EndPoint> entry : JavaConversions.mapAsJavaMap(brokerIterator.next().endPoints()).entrySet()) {
+				String connectionString = entry.getValue().connectionString();
+				//remove protocol from string
+				connectionString = connectionString.split("://")[1];
+				result.add(connectionString);
+			}
+		}
+		zkClient.close();
+		return result;
+	}
+
+	public PartitionOffsetInfo getOffsetsOfLastMessagesForTopic(String zookeeperHost, String topic, int partition) {
+		List<String> kafkaBrokers = getBrokers(zookeeperHost);
+		return getOffsetsOfLastMessagesForTopic(kafkaBrokers, topic, partition);
+	}
+
+	public PartitionOffsetInfo getOffsetsOfLastMessagesForTopic(final List<String> kafkaBrokers, final String topic, final int partition) {
+		logger.entering(this.getClass().getName(), "getOffsetsOfLastMessagesForTopic");
+
+		final PartitionOffsetInfo info = new PartitionOffsetInfo();
+		info.setOffset(-1l);
+		info.setPartitionId(partition);
+
+		final CountDownLatch subscribeAndPollLatch = new CountDownLatch(2);
+
+		final Thread consumerThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				Properties kafkaConsumerProps = getKafkaConsumerProps(kafkaBrokers);
+				final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaConsumerProps);
+				final TopicPartition topicPartition = new TopicPartition(topic, partition);
+				consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+
+					@Override
+					public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+						// TODO Auto-generated method stub
+
+					}
+
+					@Override
+					public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+						subscribeAndPollLatch.countDown();
+					}
+				});
+				logger.info("poll records from kafka for offset retrieval");
+
+				final ConsumerRecords<String, String> poll = consumer.poll(500);
+				List<ConsumerRecord<String, String>> polledRecords = poll.records(topicPartition);
+				logger.info("polled records: " + poll.count());
+				if (!polledRecords.isEmpty()) {
+					ConsumerRecord<String, String> record = polledRecords.get(polledRecords.size() - 1);
+					info.setMessage(record.value());
+					info.setOffset(record.offset());
+					info.setPartitionId(partition);
+					logger.info("polled last offset: " + record.offset());
+				}
+				subscribeAndPollLatch.countDown();
+				consumer.close();
+			}
+		});
+		logger.info("start retrieval of offset");
+		consumerThread.start();
+
+		try {
+			boolean result = subscribeAndPollLatch.await(5000, TimeUnit.MILLISECONDS);
+			if (result) {
+				logger.info("Subscribed and retrieved offset on time: " + JSONUtils.toJSON(info));
+			} else {
+				logger.warning("Could not subscribe and retrieve offset on time " + JSONUtils.toJSON(info));
+				consumerThread.interrupt();
+			}
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.log(Level.WARNING, "An error occured retrieving the last retrieved offset", e.getCause());
+		} catch (JSONException e) {
+			e.printStackTrace();
+			logger.log(Level.WARNING, "An error occured retrieving the last retrieved offset", e.getCause());
+		}
+
+		return info;
+	}
+
+	protected Properties getKafkaConsumerProps(List<String> kafkaBrokers) {
+		Properties kafkaConsumerProps = new Properties();
+		kafkaConsumerProps.put("group.id", "OffsetRetrieverConsumer" + UUID.randomUUID().toString());
+		kafkaConsumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+		kafkaConsumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+		kafkaConsumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+
+		StringBuilder brokers = new StringBuilder();
+		final Iterator<String> iterator = kafkaBrokers.iterator();
+		while (iterator.hasNext()) {
+			brokers.append(iterator.next());
+			if (iterator.hasNext()) {
+				brokers.append(",");
+			}
+		}
+		kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers.toString());
+		kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+		return kafkaConsumerProps;
+	}
+
+	public List<KafkaBrokerPartitionMessageCountInfo> getMessageCountForTopic(String zookeeperHost, String topic) {
+		logger.entering(this.getClass().getName(), "getMessageCountForTopic");
+		List<KafkaBrokerPartitionMessageCountInfo> result = new ArrayList<KafkaBrokerPartitionMessageCountInfo>();
+
+		List<Integer> partitions = getPartitionIdsForTopic(zookeeperHost, topic);
+
+		List<String> kafkaBrokers = getBrokers(zookeeperHost);
+		for (int cnt = 0; cnt < kafkaBrokers.size(); cnt++) {
+			String broker = kafkaBrokers.get(cnt);
+			logger.info("getMessageCountForTopic from broker: " + broker);
+			KafkaBrokerPartitionMessageCountInfo container = new KafkaBrokerPartitionMessageCountInfo();
+			container.setBroker(broker);
+
+			String[] splitBroker = broker.split(":");
+			String host = splitBroker[0];
+			String port = splitBroker[1];
+			SimpleConsumer consumer = new SimpleConsumer(host, Integer.valueOf(port), 100000, 64 * 1024, "leaderLookup");
+			Map<Integer, Long> partitionCountMap = new HashMap<Integer, Long>();
+
+			for (Integer partition : partitions) {
+				logger.info("broker: " + broker + ", partition " + partition);
+				partitionCountMap.put(partition, null);
+				FetchRequest req = new FetchRequestBuilder().clientId(CLIENT_ID).addFetch(topic, partition, 0, 100000).build();
+				FetchResponse fetchResponse = consumer.fetch(req);
+
+				if (fetchResponse.hasError()) {
+					//in case of a broker error, do nothing. The broker has no information about the partition so we continue with the next one.
+					if (fetchResponse.errorCode(topic, partition) == ErrorMapping.NotLeaderForPartitionCode()) {
+						logger.info("broker " + broker + " is not leader for partition " + partition + ", cannot retrieve MessageCountForTopic");
+					} else {
+						logger.warning("broker: " + broker + ", partition " + partition + " has error: " + fetchResponse.errorCode(topic, partition));
+					}
+					continue;
+				}
+
+				long numRead = 0;
+				long readOffset = numRead;
+
+				for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
+					long currentOffset = messageAndOffset.offset();
+					if (currentOffset < readOffset) {
+						logger.info("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
+						continue;
+					}
+					readOffset = messageAndOffset.nextOffset();
+					numRead++;
+				}
+
+				logger.info("broker: " + broker + ", partition " + partition + " total messages: " + numRead);
+				partitionCountMap.put(partition, numRead);
+			}
+			consumer.close();
+			container.setPartitionMsgCountMap(partitionCountMap);
+			result.add(container);
+		}
+
+		return result;
+	}
+
+	/**
+	 * @param group
+	 * @param topic
+	 * @return a list of partitions and their offsets. If no offset is found, it is returned as -1
+	 */
+	public List<PartitionOffsetInfo> getOffsetsForTopic(String zookeeperHost, String group, String topic) {
+		BlockingChannel channel = getOffsetManagerChannel(zookeeperHost, group);
+
+		List<Integer> partitionIds = getPartitionIdsForTopic(zookeeperHost, topic);
+		List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>();
+		int correlationId = 0;
+		for (Integer id : partitionIds) {
+			TopicAndPartition testPartition0 = new TopicAndPartition(topic, id);
+			partitions.add(testPartition0);
+		}
+
+		OffsetFetchRequest fetchRequest = new OffsetFetchRequest(group, partitions, (short) 1 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper
+				correlationId++, CLIENT_ID);
+
+		List<PartitionOffsetInfo> offsetResult = new ArrayList<PartitionOffsetInfo>();
+		int retryCount = 0;
+		//it is possible that a ConsumerCoordinator is not available yet, if this is the case we need to wait and try again.
+		boolean done = false;
+		while (retryCount < 5 && !done) {
+			offsetResult = new ArrayList<PartitionOffsetInfo>();
+			retryCount++;
+			channel.send(fetchRequest.underlying());
+			OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload());
+
+			boolean errorFound = false;
+			for (TopicAndPartition part : partitions) {
+				if (part.topic().equals(topic)) {
+					PartitionOffsetInfo offsetInfo = new PartitionOffsetInfo();
+					offsetInfo.setPartitionId(part.partition());
+					OffsetMetadataAndError result = fetchResponse.offsets().get(part);
+					short offsetFetchErrorCode = result.error();
+					if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) {
+						channel.disconnect();
+						String msg = "Offset could not be fetched, the used broker is not the coordinator for this consumer";
+						offsetInfo.setMessage(msg);
+						logger.warning(msg);
+						errorFound = true;
+						break;
+					} else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) {
+						logger.warning("Offset could not be fetched at this point, the offsets are not available yet");
+						try {
+							Thread.sleep(2000);
+						} catch (InterruptedException e) {
+							e.printStackTrace();
+						}
+						//Offsets are not available yet. Wait and try again
+						errorFound = true;
+						break;
+					} else if (result.error() != ErrorMapping.NoError()) {
+						String msg = MessageFormat.format("Offset could not be fetched at this point, an unknown error occured ( {0} )", result.error());
+						offsetInfo.setMessage(msg);
+						logger.warning(msg);
+					} else {
+						long offset = result.offset();
+						offsetInfo.setOffset(offset);
+					}
+
+					offsetResult.add(offsetInfo);
+				}
+			}
+			if (!errorFound) {
+				done = true;
+			}
+		}
+
+		if (channel.isConnected()) {
+			channel.disconnect();
+		}
+		return offsetResult;
+	}
+
+	public List<TopicMetadata> getMetadataForTopic(String zookeeperHost, String kafkaTopic) {
+		//connecting to a single broker should be enough because every single broker knows everything we need
+		for (String brokerHost : getBrokers(zookeeperHost)) {
+			brokerHost = brokerHost.replace("PLAINTEXT://", "");
+			String[] splitBroker = brokerHost.split(":");
+			String ip = splitBroker[0];
+			String port = splitBroker[1];
+
+			//it is possible that a ConsumerCoordinator is not available yet, if this is the case we need to wait and try again.
+			SimpleConsumer consumer = null;
+			try {
+				consumer = new SimpleConsumer(ip, Integer.valueOf(port), 100000, 64 * 1024, "leaderLookup");
+				int retryCount = 0;
+				boolean done = false;
+				while (retryCount < 5 && !done) {
+					retryCount++;
+
+					List<String> topics = Collections.singletonList(kafkaTopic);
+					TopicMetadataRequest req = new TopicMetadataRequest(topics);
+					kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+					List<TopicMetadata> metaData = resp.topicsMetadata();
+
+					boolean errorFound = false;
+					for (TopicMetadata item : metaData) {
+						if (item.topic().equals(kafkaTopic)) {
+							if (item.errorCode() == ErrorMapping.LeaderNotAvailableCode()) {
+								//wait and try again
+								errorFound = true;
+								try {
+									Thread.sleep(2000);
+								} catch (InterruptedException e) {
+									e.printStackTrace();
+								}
+								break;
+							}
+							return metaData;
+						}
+					}
+
+					if (!errorFound) {
+						done = true;
+					}
+				}
+			} finally {
+				if (consumer != null) {
+					consumer.close();
+				}
+			}
+		}
+		return null;
+	}
+
+	public List<Integer> getPartitionsForTopic(String zookeeperHost, String topic) {
+		ZkClient zkClient = new ZkClient(zookeeperHost, 5000, 5000, ZKStringSerializer$.MODULE$);
+		Map<String, Seq<Object>> partitions = JavaConversions
+				.mapAsJavaMap(new ZkUtils(zkClient, new ZkConnection(zookeeperHost), false).getPartitionsForTopics(JavaConversions.asScalaBuffer(Arrays.asList(topic)).toList()));
+		List<Object> partitionObjList = JavaConversions.seqAsJavaList(partitions.entrySet().iterator().next().getValue());
+		List<Integer> partitionsList = new ArrayList<Integer>();
+		for (Object partObj : partitionObjList) {
+			partitionsList.add((Integer) partObj);
+		}
+		zkClient.close();
+		return partitionsList;
+	}
+
+	public List<KafkaPartitionInfo> getPartitionInfoForTopic(String zookeeperHost, String topic) {
+		List<TopicMetadata> topicInfos = getMetadataForTopic(zookeeperHost, topic);
+		List<KafkaPartitionInfo> partitionInfoList = new ArrayList<KafkaPartitionInfo>();
+		for (TopicMetadata topicInfo : topicInfos) {
+			for (PartitionMetadata part : topicInfo.partitionsMetadata()) {
+				KafkaPartitionInfo info = new KafkaPartitionInfo();
+				info.setPartitionId(part.partitionId());
+
+				List<BrokerNode> partitionNodes = new ArrayList<BrokerNode>();
+				for (BrokerEndPoint brokerPoint : part.isr()) {
+					BrokerNode node = new BrokerNode();
+					node.setHost(brokerPoint.connectionString());
+					node.setLeader(brokerPoint.connectionString().equals(part.leader().connectionString()));
+					partitionNodes.add(node);
+				}
+				info.setNodes(partitionNodes);
+				partitionInfoList.add(info);
+			}
+		}
+		//partitionInformation is collected, end loop and return
+		return partitionInfoList;
+	}
+
+	public List<Integer> getPartitionIdsForTopic(String zookeeperHost, String topic) {
+		List<TopicMetadata> metadata = getMetadataForTopic(zookeeperHost, topic);
+
+		List<Integer> partitionsList = new ArrayList<Integer>();
+		if (metadata != null && metadata.size() > 0) {
+			for (PartitionMetadata partData : metadata.get(0).partitionsMetadata()) {
+				partitionsList.add(partData.partitionId());
+			}
+		}
+
+		return partitionsList;
+	}
+
+	private BlockingChannel getOffsetManagerChannel(String zookeeperHost, String group) {
+		int correlationId = 0;
+		for (String broker : getBrokers(zookeeperHost)) {
+			String[] splitBroker = broker.split(":");
+			String ip = splitBroker[0];
+			String port = splitBroker[1];
+
+			int retryCount = 0;
+			//it is possible that a ConsumerCoordinator is not available yet, if this is the case we need to wait and try again.
+			while (retryCount < 5) {
+				retryCount++;
+
+				BlockingChannel channel = new BlockingChannel(ip, Integer.valueOf(port), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(),
+						5000 /* read timeout in millis */);
+				channel.connect();
+				channel.send(new GroupCoordinatorRequest(group, OffsetRequest.CurrentVersion(), correlationId++, CLIENT_ID));
+				GroupCoordinatorResponse metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());
+
+				if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
+					BrokerEndPoint endPoint = metadataResponse.coordinatorOpt().get();
+					if (!endPoint.host().equals(ip) && !port.equals(endPoint.port())) {
+						channel.disconnect();
+						channel = new BlockingChannel(endPoint.host(), endPoint.port(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000);
+						channel.connect();
+					}
+					return channel;
+				} else if (metadataResponse.errorCode() == ErrorMapping.ConsumerCoordinatorNotAvailableCode()
+						|| metadataResponse.errorCode() == ErrorMapping.OffsetsLoadInProgressCode()) {
+					//wait and try again
+					try {
+						Thread.sleep(2000);
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				} else {
+					//unknown error, continue with next broker
+					break;
+				}
+			}
+		}
+		throw new RuntimeException("Kafka Consumer Broker not available!");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
new file mode 100755
index 0000000..33c4ae0
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaProducerManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.TimeoutException;
+
+import org.apache.atlas.odf.core.Environment;
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.messaging.MessageEncryption;
+import org.apache.atlas.odf.api.ODFFactory;
+import org.apache.atlas.odf.api.settings.SettingsManager;
+
+public class KafkaProducerManager {
+
+	private final static Logger logger = Logger.getLogger(KafkaProducerManager.class.getName());
+	private static KafkaProducer<String, String> producer;
+
+	protected Properties getKafkaProducerConfig() {
+		SettingsManager odfConfig = new ODFFactory().create().getSettingsManager();
+		ODFInternalFactory f = new ODFInternalFactory();
+		Properties props = odfConfig.getKafkaProducerProperties();
+		String zookeeperConnect = f.create(Environment.class).getZookeeperConnectString();
+		final Iterator<String> brokers = f.create(KafkaMonitor.class).getBrokers(zookeeperConnect).iterator();
+		StringBuilder brokersString = new StringBuilder();
+		while (brokers.hasNext()) {
+			brokersString.append(brokers.next());
+			if (brokers.hasNext()) {
+				brokersString.append(",");
+			}
+		}
+		logger.info("Sending messages to brokers: " + brokersString.toString());
+		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersString.toString());
+		props.put(ProducerConfig.CLIENT_ID_CONFIG, "ODF_MESSAGE_PRODUCER");
+		return props;
+	}
+
+	private KafkaProducer<String, String> getProducer() {
+		if (producer == null) {
+			producer = new KafkaProducer<String, String>(getKafkaProducerConfig());
+		}
+		return producer;
+	}
+
+	public void sendMsg(String topicName, String key, String value) {
+		MessageEncryption msgEncryption = new ODFInternalFactory().create(MessageEncryption.class);
+		value = msgEncryption.encrypt(value);
+		sendMsg(topicName, key, value, null);
+	}
+
+	public void sendMsg(final String topicName, final String key, final String value, final Callback callback) {
+		ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topicName, key, value);
+		try {
+			int retryCount = 0;
+			boolean msgSend = false;
+			while (retryCount < 5 && !msgSend) {
+				try {
+					getProducer().send(producerRecord, callback).get(4000, TimeUnit.MILLISECONDS);
+					msgSend = true;
+				} catch (ExecutionException ex) {
+					if (ex.getCause() instanceof TimeoutException) {
+						logger.warning("Message could not be send within 4000 ms");
+						retryCount++;
+					} else {
+						throw ex;
+					}
+
+				}
+			}
+			if (retryCount == 5) {
+				logger.warning("Message could not be send within 5 retries!");
+				logger.fine("topic: " + topicName + " key " + key + " msg " + value);
+			}
+		} catch (Exception exc) {
+			logger.log(Level.WARNING, "Exception while sending message", exc);
+			if (producer != null) {
+				producer.close();
+			}
+			producer = null;
+			throw new RuntimeException(exc);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6d19e129/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
----------------------------------------------------------------------
diff --git a/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
new file mode 100755
index 0000000..d0cf704
--- /dev/null
+++ b/odf/odf-messaging/src/main/java/org/apache/atlas/odf/core/messaging/kafka/KafkaQueueConsumer.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.odf.core.messaging.kafka;
+
+import java.text.MessageFormat;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+import org.apache.atlas.odf.core.ODFInternalFactory;
+import org.apache.atlas.odf.core.controlcenter.ODFRunnable;
+import org.apache.atlas.odf.core.controlcenter.QueueMessageProcessor;
+import org.apache.atlas.odf.core.messaging.MessageEncryption;
+
+import kafka.consumer.ConsumerTimeoutException;
+
+public class KafkaQueueConsumer implements ODFRunnable {
+	private Logger logger = Logger.getLogger(KafkaQueueConsumer.class.getName());
+	final static int POLLING_DURATION_MS = 100;
+	public static final int MAX_PROCESSING_EXCEPTIONS = 3;
+	public final static int MAX_CONSUMPTION_EXCEPTIONS = 5;
+	
+	public static interface ConsumptionCallback {
+		boolean stopConsumption();
+	}
+
+	private boolean ready = false;
+
+	private String topic;
+	private KafkaConsumer<String, String> kafkaConsumer;
+	private Properties config;
+	private boolean isShutdown = false;
+	private ExecutorService executorService;
+	private QueueMessageProcessor requestConsumer;
+	private int consumptionExceptionCount = 0;
+	private ConsumptionCallback consumptionCallback;
+
+	public KafkaQueueConsumer(String topicName, Properties config, QueueMessageProcessor requestConsumer) {
+		this(topicName, config, requestConsumer, null);
+	}
+	
+	public KafkaQueueConsumer(String topicName, Properties config, QueueMessageProcessor requestConsumer, ConsumptionCallback consumptionCallback) {
+		this.topic = topicName;
+		this.config = config;
+		this.requestConsumer = requestConsumer;
+		this.consumptionCallback = consumptionCallback;
+		if (this.consumptionCallback == null) {
+			this.consumptionCallback = new ConsumptionCallback() {
+
+				@Override
+				public boolean stopConsumption() {
+					// default: never stop
+					return false;
+				}
+				
+			};
+		}
+	}
+
+	public void run() {
+		final String groupId = this.config.getProperty("group.id");
+		while (consumptionExceptionCount < MAX_CONSUMPTION_EXCEPTIONS && !isShutdown) {
+			try {
+				logger.info("Starting consumption for " + groupId);
+				startConsumption();
+			} catch (RuntimeException ex) {
+				if (ex.getCause() instanceof WakeupException) {
+					isShutdown = true;
+				} else {
+					consumptionExceptionCount++;
+					logger.log(Level.WARNING, "Caught exception in KafkaQueueConsumer " + groupId + ", restarting consumption!", ex);
+				}
+				if (this.kafkaConsumer != null) {
+					this.kafkaConsumer.close();
+					this.kafkaConsumer = null;
+				}
+			} catch (Exception e) {
+				consumptionExceptionCount++;
+				logger.log(Level.WARNING, "Caught exception in KafkaQueueConsumer " + groupId + ", restarting consumption!", e);
+				if (this.kafkaConsumer != null) {
+					this.kafkaConsumer.close();
+					this.kafkaConsumer = null;
+				}
+			}
+		}
+		logger.info("Enough consumption for " + groupId);
+		this.ready = false;
+		this.cancel();
+	}
+
+	private void startConsumption() {
+		if (this.consumptionCallback.stopConsumption()) {
+			return;
+		}
+		Exception caughtException = null;
+		final String logPrefix = this + " consumer: [" + this.requestConsumer.getClass().getSimpleName() + "], on " + topic + ": ";
+		try {
+			if (this.kafkaConsumer == null) {
+				logger.fine(logPrefix + " create new consumer for topic " + topic);
+				try {
+					this.kafkaConsumer = new KafkaConsumer<String, String>(config);
+					kafkaConsumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+
+						@Override
+						public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+							logger.fine(logPrefix + " partitions revoked " + topic + " new partitions: " + partitions.size());
+						}
+
+						@Override
+						public void onPartitionsAssigned(Collection<TopicPartition> partitions) {						
+							logger.finer(logPrefix + " partitions assigned " + topic + " , new partitions: " + partitions.size());
+							logger.info(logPrefix + "consumer is ready with " + partitions.size() + " partitions assigned");
+							ready = true;
+						}
+					});
+				} catch (ZkTimeoutException zkte) {
+					String zkHosts = config.getProperty("zookeeper.connect");
+					logger.log(Level.SEVERE, logPrefix + " Could not connect to the Zookeeper instance at ''{0}''. Please ensure that Zookeeper is running", zkHosts);
+					throw zkte;
+				}
+			}
+			logger.log(Level.INFO, logPrefix + " Consumer " + "''{1}'' is now listening on ODF queue ''{0}'' with configuration {2}", new Object[] { topic, requestConsumer, config });
+			MessageEncryption msgEncryption = new ODFInternalFactory().create(MessageEncryption.class);
+			while (!Thread.interrupted() && !isShutdown && kafkaConsumer != null) {
+				if (this.consumptionCallback.stopConsumption()) {
+					isShutdown = true;
+					break;
+				}
+				ConsumerRecords<String, String> records = kafkaConsumer.poll(POLLING_DURATION_MS);
+				kafkaConsumer.commitSync(); // commit offset immediately to avoid timeouts for long running processors
+				for (TopicPartition partition : kafkaConsumer.assignment()) {
+					List<ConsumerRecord<String, String>> polledRecords = records.records(partition);
+					//		logger.log(Level.FINEST, logPrefix + "Polling finished got {0} results, continue processing? {1}", new Object[] { polledRecords.size(), continueProcessing });
+					if (!polledRecords.isEmpty()) {
+						logger.fine(polledRecords.get(0).value() + " offset: " + polledRecords.get(0).offset());
+					}
+
+					for (int no = 0; no < polledRecords.size(); no++) {
+						ConsumerRecord<String, String> record = polledRecords.get(no);
+						String s = record.value();
+						logger.log(Level.FINEST, logPrefix + "Decrypting message {0}", s);
+						try {
+							s = msgEncryption.decrypt(s);
+						} catch (Exception exc) {
+							logger.log(Level.WARNING, "Message could not be decrypted, ignoring it", exc);
+							s = null;
+						}
+						if (s != null) {
+							logger.log(Level.FINEST, logPrefix + "Sending message to consumer ''{0}''", s);
+							int exceptionCount = 0;
+							boolean processedSuccessfully = false;
+							while (exceptionCount < MAX_PROCESSING_EXCEPTIONS && !processedSuccessfully) {
+								try {
+									exceptionCount++;
+									this.requestConsumer.process(executorService, s, record.partition(), record.offset());
+									processedSuccessfully = true;
+								} catch (Exception ex) {
+									logger.warning("Exception " + exceptionCount + " caught processing message!");
+								}
+							}
+						}
+					}
+				}
+			}
+		} catch (ConsumerTimeoutException e) {
+			String msg = MessageFormat.format(" Caught timeout on queue ''{0}''", topic);
+			logger.log(Level.WARNING, logPrefix + msg, e);
+			caughtException = e;
+		} catch (Exception exc) {
+			String msg = MessageFormat.format(" Caught exception on queue ''{0}''", topic);
+			logger.log(Level.WARNING, logPrefix + msg, exc);
+			caughtException = exc;
+		} finally {
+			if (kafkaConsumer != null) {
+				logger.log(Level.FINE, logPrefix + "Closing consumer " + " on topic ''{0}''", topic);
+				kafkaConsumer.close();
+				logger.log(Level.FINE, logPrefix + "Closed consumer " + " on topic ''{0}''", topic);
+				kafkaConsumer = null;
+			}
+		}
+		logger.log(Level.INFO, logPrefix + "Finished consumer on topic ''{0}''", topic);
+		if (caughtException != null) {
+			caughtException.printStackTrace();
+			throw new RuntimeException(caughtException);
+		}
+	}
+
+	public void cancel() {
+		logger.log(Level.INFO, "Shutting down consumer on topic ''{0}''", topic);
+		if (this.kafkaConsumer != null) {
+			this.kafkaConsumer.wakeup();
+		}
+		isShutdown = true;
+	}
+
+	public boolean isShutdown() {
+		return isShutdown;
+	}
+
+	@Override
+	public void setExecutorService(ExecutorService service) {
+		this.executorService = service;
+	}
+
+	@Override
+	public boolean isReady() {
+		return ready;
+	}
+
+}


Mime
View raw message