edgent-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dlab...@apache.org
Subject [14/54] [abbrv] [partial] incubator-quarks git commit: add "org.apache." prefix to edgent package names
Date Thu, 21 Jul 2016 13:17:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/console/servlets/src/main/java/org/apache/edgent/console/servlets/MetricsUtil.java
----------------------------------------------------------------------
diff --git a/console/servlets/src/main/java/org/apache/edgent/console/servlets/MetricsUtil.java b/console/servlets/src/main/java/org/apache/edgent/console/servlets/MetricsUtil.java
new file mode 100644
index 0000000..bca6cc6
--- /dev/null
+++ b/console/servlets/src/main/java/org/apache/edgent/console/servlets/MetricsUtil.java
@@ -0,0 +1,378 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.console.servlets;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.edgent.console.servlets.MetricsGson.OpMetric;
+import org.apache.edgent.console.servlets.MetricsGson.Operator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class MetricsUtil {
+	
+	static MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+	private static final Logger logger = LoggerFactory.getLogger(MetricsUtil.class);
+
+	static Iterator<ObjectInstance> getCounterObjectIterator(String jobId) {
+		ObjectName counterObjName = null;
+		StringBuffer sbuf = new StringBuffer();
+		sbuf.append("*:jobId=" + jobId);
+		sbuf.append(",type=metric.counters,*");
+
+        // i.e, edgent.providers.development:jobId=JOB-0,opId=OP_4,name=TupleRateMeter.edgent.oplet.JOB_0.OP_4,type=metric.meters
+		try {
+			counterObjName = new ObjectName(sbuf.toString());
+		} catch (MalformedObjectNameException e) {
+		    logger.error("Error caught while initializing ObjectName", e);
+		}
+		Set<ObjectInstance> counterInstances = mBeanServer.queryMBeans(counterObjName, null);
+		return counterInstances.iterator();
+		
+	}
+	static Iterator<ObjectInstance> getMeterObjectIterator(String jobId) {
+		ObjectName meterObjName = null;
+			
+			StringBuffer sbuf1 = new StringBuffer();
+			sbuf1.append("*:jobId=" + jobId);
+			sbuf1.append(",type=metric.meters,*");
+
+			try {
+				meterObjName = new ObjectName(sbuf1.toString());
+			} catch (MalformedObjectNameException e) {
+			    logger.error("Error caught while initializing ObjectName", e);
+			}
+			
+
+		Set<ObjectInstance> meterInstances = mBeanServer.queryMBeans(meterObjName, null);
+		// return only those beans that are part of the job
+		return meterInstances.iterator();
+	}
+	
+	static MetricsGson getAvailableMetricsForJob(String jobId, Iterator<ObjectInstance> meterIterator, Iterator<ObjectInstance> counterIterator) {
+		MetricsGson gsonJob = new MetricsGson();
+		ArrayList<Operator> counterOps = new ArrayList<Operator>();
+		gsonJob.setJobId(jobId);
+		while (meterIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance meterInstance = (ObjectInstance)meterIterator.next();
+			ObjectName mObjName = meterInstance.getObjectName(); 
+			String opName = mObjName.getKeyProperty("opId");
+
+			Operator anOp = null;
+				if (!opName.equals("")) {
+					MBeanInfo mBeanInfo = null;
+					try {
+						mBeanInfo = mBeanServer.getMBeanInfo(mObjName);
+					} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+					    logger.error("Exception caught while getting MBeanInfo", e);
+					}
+
+			    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+			    		OpMetric aMetric = gsonJob.new OpMetric();
+		    			aMetric.type = "meter";
+		    			aMetric.name = attributeInfo.getName();
+				 
+	    				try {
+	    					aMetric.value = String.valueOf(mBeanServer.getAttribute(mObjName, aMetric.name));
+	    				} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+							| ReflectionException e) {
+	    				    logger.error("Exception caught while accessing MBean", e);
+	    				}
+		    			
+		    			// if the op associated with this metric is not in the job add it
+		    			if (!gsonJob.isOpInJob(opName)) {
+						    anOp = gsonJob.new Operator();
+						    gsonJob.addOp(anOp);
+						    anOp.opId = opName;
+						    counterOps.add(anOp);
+						    metrics = new ArrayList<OpMetric>();
+		    			} 
+		    			metrics.add(aMetric);
+			    	}
+			    	gsonJob.setOpMetrics(anOp, metrics);
+				}
+		}
+
+		while (counterIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance counterInstance = (ObjectInstance)counterIterator.next();
+			ObjectName cObjName = counterInstance.getObjectName();
+			String opName1 = cObjName.getKeyProperty("opId");
+
+			Operator anOp = null;
+			if (!opName1.equals("")) {
+			MBeanInfo mBeanInfo = null;
+			try {
+				mBeanInfo = mBeanServer.getMBeanInfo(cObjName);
+			} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+			    logger.error("Exception caught while getting MBeanInfo", e);
+			}
+
+	    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+	    		OpMetric aMetric = gsonJob.new OpMetric();
+    			aMetric.type = "counter";
+    			aMetric.name = attributeInfo.getName();
+				try {
+					aMetric.value = String.valueOf(mBeanServer.getAttribute(cObjName, aMetric.name));
+				} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+					| ReflectionException e) {
+				    logger.error("Exception caught while accessing MBean", e);
+				}
+    			Operator theOp = gsonJob.getOp(opName1);
+				if (theOp == null) {
+					anOp = gsonJob.new Operator();
+					gsonJob.addOp(anOp);
+					anOp.opId = opName1;
+					metrics = new ArrayList<OpMetric>();
+					gsonJob.setOpMetrics(anOp, metrics);
+				} else {
+					// get the op
+					metrics = theOp.metrics;
+				}
+    			metrics.add(aMetric);
+	    	}
+			}
+		}
+		
+		return gsonJob;
+
+	}
+	
+	/**
+	 * Get all the rate metrics, i.e, one minute rate, fifteen minute rate, mean rate, etc  for a job
+	 * @param job id (e.g, "JOB_0")
+	 * 
+	 * @return  all metrics for this job if there are any
+	 */
+	static MetricsGson getAllRateMetrics(String jobId) {
+		MetricsGson gsonJob = new MetricsGson();
+		gsonJob.setJobId(jobId);
+		
+		Iterator<ObjectInstance> meterIterator = MetricsUtil.getMeterObjectIterator(jobId);
+		while (meterIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance meterInstance = (ObjectInstance)meterIterator.next();
+			ObjectName mObjName = meterInstance.getObjectName();
+			//i.e, edgent.providers.development:jobId=JOB-0,opId=OP_4,name=TupleRateMeter.edgent.oplet.JOB_0.OP_4,type=metric.meters
+			String jobName = mObjName.getKeyProperty("jobId");
+			String opName = mObjName.getKeyProperty("opId");
+			Operator anOp = null;
+
+			if (jobId.equals(jobName)) {
+				MBeanInfo mBeanInfo = null;
+			
+				try {
+					mBeanInfo = mBeanServer.getMBeanInfo(mObjName);
+				} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+				    logger.error("Exception caught while getting MBeanInfo", e);
+				}
+				
+		    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+		    		String name = attributeInfo.getName();
+    				OpMetric aMetric = gsonJob.new OpMetric();
+    				aMetric.name = name;
+    				aMetric.type = attributeInfo.getType();
+    				try {
+						aMetric.value = String.valueOf(mBeanServer.getAttribute(mObjName, name));
+    				} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+							| ReflectionException e) {
+					    logger.error("Exception caught while accessing MBean", e);
+    				}
+					 if (!gsonJob.isOpInJob(opName)) {
+					    anOp = gsonJob.new Operator();
+					    gsonJob.addOp(anOp);
+					    anOp.opId = opName;
+					    metrics = new ArrayList<OpMetric>();
+					    gsonJob.setOpMetrics(anOp, metrics);
+					 } else {
+						anOp = gsonJob.getOp(opName);
+						metrics = anOp.metrics;
+					 }
+					 metrics.add(aMetric);
+		    	}
+			}
+		}
+		
+		Iterator<ObjectInstance> counterIterator = MetricsUtil.getCounterObjectIterator(jobId);
+		while (counterIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance counterInstance = (ObjectInstance)counterIterator.next();
+			ObjectName cObjName = counterInstance.getObjectName();
+			String opName1 = cObjName.getKeyProperty("opId");
+
+			Operator anOp = null;
+			if (!opName1.equals("")) {
+			MBeanInfo mBeanInfo = null;
+			try {
+				mBeanInfo = mBeanServer.getMBeanInfo(cObjName);
+			} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+			    logger.error("Exception caught while getting MBeanInfo", e);
+			}
+
+	    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+	    		OpMetric aMetric = gsonJob.new OpMetric();
+    			aMetric.type = "counter";
+    			aMetric.name = attributeInfo.getName();
+				try {
+					aMetric.value = String.valueOf(mBeanServer.getAttribute(cObjName, aMetric.name));
+				} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+					| ReflectionException e) {
+				    logger.error("Exception caught while accessing MBean", e);
+				}
+    			Operator theOp = gsonJob.getOp(opName1);
+				if (theOp == null) {
+					anOp = gsonJob.new Operator();
+					gsonJob.addOp(anOp);
+					anOp.opId = opName1;
+					metrics = new ArrayList<OpMetric>();
+					gsonJob.setOpMetrics(anOp, metrics);
+				} else {
+					// get the op
+					metrics = theOp.metrics;
+				}
+    			metrics.add(aMetric);
+	    	}
+			}
+		}
+		return gsonJob;
+	}
+	// format for metricName is "name:RateUnit,type:meter"
+	static MetricsGson getMetric(String jobId, String metricName, Iterator<ObjectInstance> metricIterator, Iterator<ObjectInstance> counterIterator) {
+
+		MetricsGson gsonJob = new MetricsGson();
+		gsonJob.setJobId(jobId);
+		String[] desiredParts = metricName.split(",");
+		String[] nameA = new String[2];
+		String desName = "";
+		if (!desiredParts[0].equals("")) {
+			nameA = desiredParts[0].split(":");
+			desName = nameA[1];
+		}
+		
+		while (metricIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance meterInstance = (ObjectInstance)metricIterator.next();
+			ObjectName mObjName = meterInstance.getObjectName();
+			//i.e, edgent.providers.development:jobId=JOB-0,opId=OP_4,name=TupleRateMeter.edgent.oplet.JOB_0.OP_4,type=metric.meters
+			String jobName = mObjName.getKeyProperty("jobId");
+			String opName = mObjName.getKeyProperty("opId");
+			Operator anOp = null;
+
+			if (jobId.equals(jobName)) {
+				MBeanInfo mBeanInfo = null;
+			
+				try {
+					mBeanInfo = mBeanServer.getMBeanInfo(mObjName);
+				} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+				    logger.error("Exception caught while getting MBeanInfo", e);
+				}
+				
+		    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+		    		String name = attributeInfo.getName();
+	    			 if(name.equals(desName)) {
+	    				 OpMetric aMetric = gsonJob.new OpMetric();
+	    				 aMetric.name = name;
+	    				 aMetric.type = attributeInfo.getType();
+	    				 try {
+							aMetric.value = String.valueOf(mBeanServer.getAttribute(mObjName, name));
+						} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+								| ReflectionException e) {
+						    logger.error("Exception caught while accessing MBean", e);
+						}
+    					if (!gsonJob.isOpInJob(opName)) {
+    					    anOp = gsonJob.new Operator();
+    					    gsonJob.addOp(anOp);
+    					    anOp.opId = opName;
+    					    metrics = new ArrayList<OpMetric>();
+    					    gsonJob.setOpMetrics(anOp, metrics);
+    					} else {
+    						anOp = gsonJob.getOp(opName);
+    						metrics = anOp.metrics;
+    					}
+	    				 metrics.add(aMetric);
+	    			 }
+		    	}
+			}
+		}
+		
+		while (counterIterator.hasNext()) {
+			ArrayList<OpMetric> metrics = null;
+			ObjectInstance counterInstance = (ObjectInstance)counterIterator.next();
+			ObjectName cObjName = counterInstance.getObjectName();
+			String jobName1 = cObjName.getKeyProperty("jobId");
+			String opName1 = cObjName.getKeyProperty("opId");
+			
+
+			Operator anOp = null;
+			if (jobId.equals(jobName1)) {
+				MBeanInfo mBeanInfo = null;
+			
+				try {
+					mBeanInfo = mBeanServer.getMBeanInfo(cObjName);
+				} catch (IntrospectionException | InstanceNotFoundException | ReflectionException e) {
+				    logger.error("Exception caught while getting MBeanInfo", e);
+				}
+				
+		    	for (MBeanAttributeInfo attributeInfo : mBeanInfo.getAttributes()) {
+		    		String name = attributeInfo.getName();
+
+	    			 if(name.equals(desName)) {
+	    				 OpMetric aMetric = gsonJob.new OpMetric();
+	    				 aMetric.name = name;
+	    				 aMetric.type = attributeInfo.getType();
+	    				 try {
+							aMetric.value = String.valueOf(mBeanServer.getAttribute(cObjName, name));
+						} catch (AttributeNotFoundException | InstanceNotFoundException | MBeanException
+								| ReflectionException e) {
+						    logger.error("Exception caught while accessing MBean", e);
+						}
+    					if (!gsonJob.isOpInJob(opName1)) {
+    					    anOp = gsonJob.new Operator();
+    					    gsonJob.addOp(anOp);
+    					    anOp.opId = opName1;
+    					    metrics = new ArrayList<OpMetric>();
+    					    gsonJob.setOpMetrics(anOp, metrics);
+    					} else {
+    						anOp = gsonJob.getOp(opName1);
+    						metrics = anOp.metrics;
+    					}
+	    				 metrics.add(aMetric);
+	    			 }
+		    	}
+			}
+		}	
+		return gsonJob;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/console/servlets/src/main/java/org/apache/edgent/console/servlets/StreamScopeUtil.java
----------------------------------------------------------------------
diff --git a/console/servlets/src/main/java/org/apache/edgent/console/servlets/StreamScopeUtil.java b/console/servlets/src/main/java/org/apache/edgent/console/servlets/StreamScopeUtil.java
new file mode 100644
index 0000000..235433e
--- /dev/null
+++ b/console/servlets/src/main/java/org/apache/edgent/console/servlets/StreamScopeUtil.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.console.servlets;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.JMX;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.edgent.streamscope.mbeans.StreamScopeMXBean;
+import org.apache.edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
+
+public class StreamScopeUtil {
+  
+  /**
+   * Get the StreamScope for the specified stream.
+   * <P>
+   * N.B. until certain runtime issues are worked out, the stream that a
+   * StreamScopeMXBean is registered for is NOT the "origin stream" (opletId/oport)
+   * that the StreamScope was created for.  Rather the registration is for
+   * the actual StreamScope oplet's opletId and oport 0, so that's what must be
+   * supplied as the parameters.
+   * <BR>
+   * See the commentary in StreamScope oplet code.
+   * <BR>
+   * Once that is addressed, opletId/oport of the "origin stream" will
+   * need to be supplied as parameters.
+   * </P>
+   * 
+   * @param jobId the job id (e.g., "JOB_0")
+   * @param opletId the oplet id (e.g., "OP_2")
+   * @param oport the oplet output port index (0-based)
+   * 
+   * @return null if no StreamScope registered for that stream.
+   */
+  public static StreamScopeMXBean getStreamScope(String jobId, String opletId, int oport) {
+    return getRgy().lookup(jobId, opletId, oport);
+  }
+
+  private static StreamScopeRegistryMXBean getRgy() {
+    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+    ObjectName objName = mkObjectName(StreamScopeRegistryMXBean.class, StreamScopeRegistryMXBean.TYPE);
+    return JMX.newMXBeanProxy(mbs, objName, StreamScopeRegistryMXBean.class);
+  }
+  
+  private static ObjectName mkObjectName(Class<?> klass, String beanType) {
+    StringBuffer sbuf = new StringBuffer();
+    try {
+      sbuf.append("*:interface=");
+      sbuf.append(ObjectName.quote(klass.getCanonicalName()));
+      sbuf.append(",type=");
+      sbuf.append(ObjectName.quote(beanType));
+      return new ObjectName(sbuf.toString());
+    }
+    catch (MalformedObjectNameException e) {
+      
+      // TODO logger.error("Unable to create ObjectName for "+sbuf, e);
+      
+      throw new RuntimeException("Unable to create ObjectName for "+sbuf, e);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/console/servlets/webapp_content/WEB-INF/console.xml
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/WEB-INF/console.xml b/console/servlets/webapp_content/WEB-INF/console.xml
index e158c5b..6485208 100644
--- a/console/servlets/webapp_content/WEB-INF/console.xml
+++ b/console/servlets/webapp_content/WEB-INF/console.xml
@@ -5,7 +5,7 @@
     <servlet>
         <display-name>Console Application</display-name>
         <servlet-name>ConsoleServlet</servlet-name>
-        <servlet-class>edgent.console.servlets.ConsoleServlet</servlet-class>
+        <servlet-class>org.apache.edgent.console.servlets.ConsoleServlet</servlet-class>
         <load-on-startup>1</load-on-startup>
     </servlet>
     <servlet-mapping>
@@ -15,7 +15,7 @@
             <servlet>
    <display-name>Job Listing Service</display-name>
         <servlet-name>ConsoleJobServlet</servlet-name>
-        <servlet-class>edgent.console.servlets.ConsoleJobServlet</servlet-class>
+        <servlet-class>org.apache.edgent.console.servlets.ConsoleJobServlet</servlet-class>
         <load-on-startup>1</load-on-startup>
     </servlet>
     <servlet-mapping>
@@ -25,7 +25,7 @@
     <servlet>
       	<display-name>Metrics Listing Service</display-name>
      	<servlet-name>ConsoleMetricsServlet</servlet-name>
-     	<servlet-class>edgent.console.servlets.ConsoleMetricsServlet</servlet-class>
+     	<servlet-class>org.apache.edgent.console.servlets.ConsoleMetricsServlet</servlet-class>
      	<load-on-startup>1</load-on-startup>
       </servlet>
      <servlet-mapping>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/console/servlets/webapp_content/js/graph.js
----------------------------------------------------------------------
diff --git a/console/servlets/webapp_content/js/graph.js b/console/servlets/webapp_content/js/graph.js
index f3610e3..e279b91 100644
--- a/console/servlets/webapp_content/js/graph.js
+++ b/console/servlets/webapp_content/js/graph.js
@@ -16,15 +16,15 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-opletColor = {"edgent.streamscope.oplets.StreamScope": "#c7c7c7",
-        "edgent.metrics.oplets.CounterOp": "#c7c7c7", "edgent.metrics.oplets.RateMeter": "#aec7e8", "edgent.oplet.core.FanIn": "#ff7f0e",
-		"edgent.oplet.core.FanOut": "#ffbb78", "edgent.oplet.core.Peek": "#2ca02c", "edgent.oplet.core.PeriodicSource": "#98df8a", 
-		"edgent.oplet.core.Pipe": "#d62728", "edgent.oplet.core.PipeWindow": "#ff9896", "edgent.oplet.core.ProcessSource": "#9467bd", 
-		"edgent.oplet.core.Sink": "#c5b0d5", "edgent.oplet.core.Source": "#8c564b", "edgent.oplet.core.Split": "#c49c94", "edgent.oplet.core.Union" : "#1f77b4",
-		"edgent.oplet.functional.ConsumerEventSource": "#e377c2", "edgent.oplet.functional.ConsumerPeek": "#f7b6d2", "edgent.oplet.functional.ConsumerSink": "#7f7f7f", 
-		"edgent.oplet.functional.Filter": "#7F7F7F", "edgent.oplet.functional.FlatMapper": "#bcbd22", "edgent.oplet.functional.Isolate": "#dbdb8d", 
-		"edgent.oplet.functional.Map": "#17becf", "edgent.oplet.functional.SupplierPeriodicSource": "#9edae5", "edgent.oplet.functional.SupplierSource": "#b5cf6b", 
-		"edgent.oplet.plumbing.PressureReliever": "#e7cb94", "edgent.oplet.plumbing.TextFileReader": "#ad494a", "edgent.oplet.plumbing.UnorderedIsolate": "#de9ed6"};
+opletColor = {"org.apache.edgent.streamscope.oplets.StreamScope": "#c7c7c7",
+        "org.apache.edgent.metrics.oplets.CounterOp": "#c7c7c7", "org.apache.edgent.metrics.oplets.RateMeter": "#aec7e8", "org.apache.edgent.oplet.core.FanIn": "#ff7f0e",
+		"org.apache.edgent.oplet.core.FanOut": "#ffbb78", "org.apache.edgent.oplet.core.Peek": "#2ca02c", "org.apache.edgent.oplet.core.PeriodicSource": "#98df8a", 
+		"org.apache.edgent.oplet.core.Pipe": "#d62728", "org.apache.edgent.oplet.core.PipeWindow": "#ff9896", "org.apache.edgent.oplet.core.ProcessSource": "#9467bd", 
+		"org.apache.edgent.oplet.core.Sink": "#c5b0d5", "org.apache.edgent.oplet.core.Source": "#8c564b", "org.apache.edgent.oplet.core.Split": "#c49c94", "org.apache.edgent.oplet.core.Union" : "#1f77b4",
+		"org.apache.edgent.oplet.functional.ConsumerEventSource": "#e377c2", "org.apache.edgent.oplet.functional.ConsumerPeek": "#f7b6d2", "org.apache.edgent.oplet.functional.ConsumerSink": "#7f7f7f", 
+		"org.apache.edgent.oplet.functional.Filter": "#7F7F7F", "org.apache.edgent.oplet.functional.FlatMapper": "#bcbd22", "org.apache.edgent.oplet.functional.Isolate": "#dbdb8d", 
+		"org.apache.edgent.oplet.functional.Map": "#17becf", "org.apache.edgent.oplet.functional.SupplierPeriodicSource": "#9edae5", "org.apache.edgent.oplet.functional.SupplierSource": "#b5cf6b", 
+		"org.apache.edgent.oplet.plumbing.PressureReliever": "#e7cb94", "org.apache.edgent.oplet.plumbing.TextFileReader": "#ad494a", "org.apache.edgent.oplet.plumbing.UnorderedIsolate": "#de9ed6"};
 colorMap = {};
 
 addValuesToEdges = function(graph, counterMetrics) {
@@ -171,7 +171,7 @@ function collectEquivMetricEdges(graph, edge, isDownstream) {
         });
     }
     else if (isDownstream
-            && vertex.invocation.kind == "edgent.oplet.core.FanOut") {
+            && vertex.invocation.kind == "org.apache.edgent.oplet.core.FanOut") {
         pushArray(equivEdges, graph.edgeMap[outgoingEdgesKey(vertex.id)]);
     }
     return equivEdges;
@@ -189,15 +189,15 @@ function setEquivalentMetricEdges(graph, metricEdge) {
 function shouldTraverseVertex(vertex) {
   // TODO need an oplet tag or something to generalize this
   var kind = vertex.invocation.kind;
-  return kind === "edgent.streamscope.oplets.StreamScope"
-      || kind === "edgent.oplet.functional.Peek"
+  return kind === "org.apache.edgent.streamscope.oplets.StreamScope"
+      || kind === "org.apache.edgent.oplet.functional.Peek"
       // the following metric oplets are returned as "counter metrics" hence
       // have their own counter metric value (a contiguous set of them
       // should nominally have the same value)
-      // || kind === "edgent.metrics.oplet.RateMeter"
-      // || kind === "edgent.metrics.oplet.CounterOp"
-      // || kind === "edgent.metrics.oplet.a-Histogram-Op"
-      // || kind === "edgent.metrics.oplet.a-Timer-Op"
+      // || kind === "org.apache.edgent.metrics.oplet.RateMeter"
+      // || kind === "org.apache.edgent.metrics.oplet.CounterOp"
+      // || kind === "org.apache.edgent.metrics.oplet.a-Histogram-Op"
+      // || kind === "org.apache.edgent.metrics.oplet.a-Timer-Op"
       ;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/edgent_overview.html
----------------------------------------------------------------------
diff --git a/edgent_overview.html b/edgent_overview.html
index 0c086c7..84d0429 100644
--- a/edgent_overview.html
+++ b/edgent_overview.html
@@ -86,10 +86,10 @@ For example, you could analyze the last 90 seconds of data from a sensor to iden
 <P>
 <H3>Topology functional API</H3>
 <H4>Overview</H4>
-The primary api is {@link edgent.topology.Topology} which uses a functional
-model to build a topology of {@link edgent.topology.TStream streams} for an application.
+The primary api is {@link org.apache.edgent.topology.Topology} which uses a functional
+model to build a topology of {@link org.apache.edgent.topology.TStream streams} for an application.
 <BR>
-{@link edgent.topology.TStream TStream} is a declaration of a stream of tuples, an application will create streams that source data (e.g. sensor readings) and then apply functions that transform those streams into derived streams, for example simply filtering a stream containg engine temperator readings to a derived stream that only contains readings thar are greater than 100&deg;C.
+{@link org.apache.edgent.topology.TStream TStream} is a declaration of a stream of tuples, an application will create streams that source data (e.g. sensor readings) and then apply functions that transform those streams into derived streams, for example simply filtering a stream containg engine temperator readings to a derived stream that only contains readings thar are greater than 100&deg;C.
 <BR>
 An application terminates processing for a stream by <em>sinking</em> it. Sinking effectively terminates a stream by applying processing to each tuple on the stream (as it occurs) that does not produce a result. Typically this sinking is transmitting the tuple to an external system, for example the messgae hub to send the data to a back-end system, or locally sending the data to a user interface.
 </P>
@@ -113,9 +113,9 @@ However Edgent allows arbitrary topologies including:
 </UL>
 <H3>Graph API</H3>
 <H4>Overview</H4>
-The {@link edgent.graph.Graph graph} API is a lower-level API that the
+The {@link org.apache.edgent.graph.Graph graph} API is a lower-level API that the
 topology api is built on top of. A graph consists of
-{@link edgent.oplet.Oplet oplet} invocations connected by streams.
+{@link org.apache.edgent.oplet.Oplet oplet} invocations connected by streams.
 The oplet invocations contain the processing applied to each tuple
 on streams connected to their input ports. Processing by the oplet
 submits tuples to its output ports for subsequent processing
@@ -135,53 +135,53 @@ See the {@code README} there.
 Summary of samples:
 <TABLE border=1 width="80%" table-layout="auto">
 <TR class="rowColor"><TH>Sample</TH><TH>Description</TH><TH>Focus</TH></TR>
-<TR class="altColor"><TD>{@link edgent.samples.topology.HelloEdgent}</TD>
+<TR class="altColor"><TD>{@link org.apache.edgent.samples.topology.HelloEdgent}</TD>
   <TD>Prints Hello Edgent! to standard output.</TD>
   <TD>Basic mechanics of declaring a topology and executing it.</TD></TR>
-<TR class="altColor"><TD>{@link edgent.samples.topology.PeriodicSource}</TD>
+<TR class="altColor"><TD>{@link org.apache.edgent.samples.topology.PeriodicSource}</TD>
   <TD>Polls a random number generator for a new value every second
       and then prints out the raw value and a filtered and transformed stream.</TD>
   <TD>Polling of a data value to create a source stream.</TD></TR>
-<TR class="altColor"><TD>{@link edgent.samples.topology.SensorsAggregates}</TD>
+<TR class="altColor"><TD>{@link org.apache.edgent.samples.topology.SensorsAggregates}</TD>
   <TD>Demonstrates partitioned aggregation and filtering of simulated sensors
       that are bursty in nature, so that only intermittently
       is the data output to {@code System.out}</TD>
   <TD>Simulated sensors with windowed aggregation</TD></TR>
-<TR class="altColor"><TD>{@link edgent.samples.topology.SimpleFilterTransform}</TD>
+<TR class="altColor"><TD>{@link org.apache.edgent.samples.topology.SimpleFilterTransform}</TD>
   <TD></TD>
   <TD></TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/connectors/file/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/connectors/file/package-summary.html">
       File</a></TD>
   <TD>Write a stream of tuples to files.  Watch a directory for new files
       and create a stream of tuples from the file contents.</TD>
-  <TD>Use of the <a href="{@docRoot}/edgent/connectors/file/package-summary.html">
+  <TD>Use of the <a href="{@docRoot}/org/apache/edgent/connectors/file/package-summary.html">
       File stream connector</a></TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/connectors/iotf/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/connectors/iotf/package-summary.html">
       IotfSensors, IotfQuickstart</a></TD>
   <TD>Sends simulated sensor readings to an IBM Watson IoT Platform instance as device events.</TD>
-  <TD>Use of the <a href="{@docRoot}/edgent/connectors/iotf/package-summary.html">
+  <TD>Use of the <a href="{@docRoot}/org/apache/edgent/connectors/iotf/package-summary.html">
       IBM Watson IoT Platform connector</a> to send device events and receive device commands.</TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/connectors/jdbc/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/connectors/jdbc/package-summary.html">
       JDBC</a></TD>
   <TD>Write a stream of tuples to an Apache Derby database table.
       Create a stream of tuples by reading a table.</TD>
-  <TD>Use of the <a href="{@docRoot}/edgent/connectors/jdbc/package-summary.html">
+  <TD>Use of the <a href="{@docRoot}/org/apache/edgent/connectors/jdbc/package-summary.html">
       JDBC stream connector</a></TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/connectors/kafka/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/connectors/kafka/package-summary.html">
       Kafka</a></TD>
   <TD>Publish a stream of tuples to a Kafka topic. 
       Create a stream of tuples by subscribing to a topic and receiving 
       messages from it.</TD>
-  <TD>Use of the <a href="{@docRoot}/edgent/connectors/kafka/package-summary.html">
+  <TD>Use of the <a href="{@docRoot}/org/apache/edgent/connectors/kafka/package-summary.html">
       Kafka stream connector</a></TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/connectors/mqtt/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/connectors/mqtt/package-summary.html">
       MQTT</a></TD>
   <TD>Publish a stream of tuples to a MQTT topic. 
       Create a stream of tuples by subscribing to a topic and receiving 
       messages from it.</TD>
-  <TD>Use of the <a href="{@docRoot}/edgent/connectors/mqtt/package-summary.html">
+  <TD>Use of the <a href="{@docRoot}/org/apache/edgent/connectors/mqtt/package-summary.html">
       MQTT stream connector</a></TD></TR>
-<TR class="altColor"><TD><a href="{@docRoot}/edgent/samples/apps/sensorAnalytics/package-summary.html">
+<TR class="altColor"><TD><a href="{@docRoot}/org/apache/edgent/samples/apps/sensorAnalytics/package-summary.html">
       SensorAnalytics</a></TD>
   <TD>Demonstrates a Sensor Analytics application that includes: 
       configuration control, a device of one or more sensors and
@@ -199,9 +199,9 @@ on what features your application uses.
 Include one or both of the following:
 <ul>
 <li>{@code <edgent-target>/lib/edgent.providers.direct.jar} - if you use the
-{@link edgent.providers.direct.DirectProvider DirectProvider}</li>
+{@link org.apache.edgent.providers.direct.DirectProvider DirectProvider}</li>
 <li>{@code <edgent-target>/lib/edgent.providers.development.jar} - if you use the
-{@link edgent.providers.development.DevelopmentProvider DevelopmentProvider}</li>
+{@link org.apache.edgent.providers.development.DevelopmentProvider DevelopmentProvider}</li>
 </ul>
 Include the jar of any Edgent connector you use:
 <ul>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/main/java/edgent/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/edgent/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/edgent/providers/development/DevelopmentProvider.java
deleted file mode 100644
index 2581336..0000000
--- a/providers/development/src/main/java/edgent/providers/development/DevelopmentProvider.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.providers.development;
-
-import java.util.Hashtable;
-import java.util.concurrent.Future;
-
-import com.codahale.metrics.MetricRegistry;
-import com.google.gson.JsonObject;
-
-import edgent.console.server.HttpServer;
-import edgent.execution.Job;
-import edgent.execution.services.ControlService;
-import edgent.metrics.Metrics;
-import edgent.metrics.MetricsSetup;
-import edgent.metrics.oplets.CounterOp;
-import edgent.providers.direct.DirectProvider;
-import edgent.runtime.jmxcontrol.JMXControlService;
-import edgent.streamscope.StreamScopeRegistry;
-import edgent.streamscope.StreamScopeSetup;
-import edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
-import edgent.topology.Topology;
-
-/**
- * Provider intended for development.
- * This provider executes topologies using {@code DirectProvider}
- * and extends it by:
- * <UL>
- * <LI>
- * starting an embedded web-server providing the Edgent development console
- * that shows live graphs for running applications.
- * </LI>
- * <LI>
- * Creating a metrics registry with metrics registered
- * in the platform MBean server.
- * </LI>
- * <LI>
- * Add a {@link ControlService} that registers control management
- * beans in the platform MBean server.
- * </LI>
- * <LI>
- * Add tuple count metrics on all the streams before submitting a topology.
- * The implementation calls {@link Metrics#counter(Topology)} to insert 
- * {@link CounterOp} oplets into each stream.
- * </LI>
- * <LI>
- * Instrument the topology adding {@link edgent.streamscope.oplets.StreamScope StreamScope}
- * oplets on all the streams before submitting a topology.  
- * See {@link StreamScopeSetup#addStreamScopes(Topology) StreamScopeSetup.addStreamscopes}.
- * </LI>
- * <LI>
- * Add a {@link StreamScopeRegistry} runtime service and a
- * {@link StreamScopeRegistryMXBean} management bean to the {@code ControlService}.
- * See {@link StreamScopeSetup#register(edgent.execution.services.ServiceContainer) StreamScopeSetup.register}.
- * </LI>
- * </UL>
- * @see StreamScopeRegistry
- */
-public class DevelopmentProvider extends DirectProvider {
-    
-    /**
-     * JMX domains that this provider uses to register MBeans.
-     * Set to {@value}.
-     */
-    public static final String JMX_DOMAIN = "edgent.providers.development";
-    
-    public DevelopmentProvider() throws Exception {
-        
-        MetricsSetup.withRegistry(getServices(), new MetricRegistry()).
-                startJMXReporter(JMX_DOMAIN);
-        
-        getServices().addService(ControlService.class,
-                new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
-        
-        StreamScopeSetup.register(getServices());
-
-        HttpServer server = HttpServer.getInstance();
-        getServices().addService(HttpServer.class, server);   
-        server.startServer();
-    }
-
-    @Override
-    public Future<Job> submit(Topology topology, JsonObject config) {
-        Metrics.counter(topology);
-        StreamScopeSetup.addStreamScopes(topology);
-        return super.submit(topology, config);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/main/java/edgent/providers/development/package-info.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/edgent/providers/development/package-info.java b/providers/development/src/main/java/edgent/providers/development/package-info.java
deleted file mode 100644
index 2fa3963..0000000
--- a/providers/development/src/main/java/edgent/providers/development/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/**
- * Execution of a streaming topology in a development environment .
- */
-package edgent.providers.development;
-

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/main/java/org/apache/edgent/providers/development/DevelopmentProvider.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/org/apache/edgent/providers/development/DevelopmentProvider.java b/providers/development/src/main/java/org/apache/edgent/providers/development/DevelopmentProvider.java
new file mode 100644
index 0000000..67106f6
--- /dev/null
+++ b/providers/development/src/main/java/org/apache/edgent/providers/development/DevelopmentProvider.java
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.providers.development;
+
+import java.util.Hashtable;
+import java.util.concurrent.Future;
+
+import org.apache.edgent.console.server.HttpServer;
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.metrics.Metrics;
+import org.apache.edgent.metrics.MetricsSetup;
+import org.apache.edgent.metrics.oplets.CounterOp;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.runtime.jmxcontrol.JMXControlService;
+import org.apache.edgent.streamscope.StreamScopeRegistry;
+import org.apache.edgent.streamscope.StreamScopeSetup;
+import org.apache.edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
+import org.apache.edgent.topology.Topology;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.gson.JsonObject;
+
+/**
+ * Provider intended for development.
+ * This provider executes topologies using {@code DirectProvider}
+ * and extends it by:
+ * <UL>
+ * <LI>
+ * starting an embedded web-server providing the Edgent development console
+ * that shows live graphs for running applications.
+ * </LI>
+ * <LI>
+ * Creating a metrics registry with metrics registered
+ * in the platform MBean server.
+ * </LI>
+ * <LI>
+ * Add a {@link ControlService} that registers control management
+ * beans in the platform MBean server.
+ * </LI>
+ * <LI>
+ * Add tuple count metrics on all the streams before submitting a topology.
+ * The implementation calls {@link Metrics#counter(Topology)} to insert 
+ * {@link CounterOp} oplets into each stream.
+ * </LI>
+ * <LI>
+ * Instrument the topology adding {@link org.apache.edgent.streamscope.oplets.StreamScope StreamScope}
+ * oplets on all the streams before submitting a topology.  
+ * See {@link StreamScopeSetup#addStreamScopes(Topology) StreamScopeSetup.addStreamscopes}.
+ * </LI>
+ * <LI>
+ * Add a {@link StreamScopeRegistry} runtime service and a
+ * {@link StreamScopeRegistryMXBean} management bean to the {@code ControlService}.
+ * See {@link StreamScopeSetup#register(org.apache.edgent.execution.services.ServiceContainer) StreamScopeSetup.register}.
+ * </LI>
+ * </UL>
+ * @see StreamScopeRegistry
+ */
+public class DevelopmentProvider extends DirectProvider {
+    
+    /**
+     * JMX domains that this provider uses to register MBeans.
+     * Set to {@value}.
+     */
+    public static final String JMX_DOMAIN = "org.apache.edgent.providers.development";
+    
+    public DevelopmentProvider() throws Exception {
+        
+        MetricsSetup.withRegistry(getServices(), new MetricRegistry()).
+                startJMXReporter(JMX_DOMAIN);
+        
+        getServices().addService(ControlService.class,
+                new JMXControlService(JMX_DOMAIN, new Hashtable<>()));
+        
+        StreamScopeSetup.register(getServices());
+
+        HttpServer server = HttpServer.getInstance();
+        getServices().addService(HttpServer.class, server);   
+        server.startServer();
+    }
+
+    @Override
+    public Future<Job> submit(Topology topology, JsonObject config) {
+        Metrics.counter(topology);
+        StreamScopeSetup.addStreamScopes(topology);
+        return super.submit(topology, config);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/main/java/org/apache/edgent/providers/development/package-info.java
----------------------------------------------------------------------
diff --git a/providers/development/src/main/java/org/apache/edgent/providers/development/package-info.java b/providers/development/src/main/java/org/apache/edgent/providers/development/package-info.java
new file mode 100644
index 0000000..c16b40e
--- /dev/null
+++ b/providers/development/src/main/java/org/apache/edgent/providers/development/package-info.java
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * Execution of a streaming topology in a development environment .
+ */
+package org.apache.edgent.providers.development;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentProviderTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentProviderTest.java b/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentProviderTest.java
deleted file mode 100644
index 1b591e2..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentProviderTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collection;
-
-import org.junit.Test;
-
-import edgent.graph.Graph;
-import edgent.graph.Vertex;
-import edgent.metrics.oplets.CounterOp;
-import edgent.oplet.Oplet;
-import edgent.streamscope.oplets.StreamScope;
-import edgent.test.topology.TopologyAbstractTest;
-import edgent.topology.TStream;
-import edgent.topology.Topology;
-import edgent.topology.tester.Condition;
-
-public class DevelopmentProviderTest extends TopologyAbstractTest implements DevelopmentTestSetup {
-
-    // DevelopmentProvider inserts CounterOp metric oplets into the graph
-    @Test
-    public void testMetricsEverywhere() throws Exception {
-
-        Topology t = newTopology();
-        TStream<String> s = t.strings("a", "b", "c");
-
-        // Condition inserts a sink
-        Condition<Long> tc = t.getTester().tupleCount(s, 3);
-
-        Graph g = t.graph();
-        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
-        
-        // Two vertices before submission
-        assertEquals(2, vertices.size());
-
-        complete(t, tc);
-  
-        // At least three vertices after submission
-        // (provide may have added other oplets as well)
-        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
-        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
-        
-        // There is exactly one vertex for a metric oplet
-        int numOplets = 0;
-        for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
-            Oplet<?,?> oplet = v.getInstance();
-            if (oplet instanceof CounterOp) {
-                numOplets++;
-            }
-        }
-        assertEquals(1, numOplets);
-    }
-
-    // DevelopmentProvider inserts StreamScope oplets into the graph
-    @Test
-    public void testStreamScopesEverywhere() throws Exception {
-
-        Topology t = newTopology();
-        TStream<String> s = t.strings("a", "b", "c");
-        s = s.map(tuple -> tuple)
-            .filter(tuple -> true);
-
-        // Condition inserts a sink
-        Condition<Long> tc = t.getTester().tupleCount(s, 3);
-
-        Graph g = t.graph();
-        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
-        
-        // Four vertices before submission
-        assertEquals(4, vertices.size());
-
-        complete(t, tc);
-  
-        // At least 4+3 vertices after submission
-        // (provide may have added other oplets as well)
-        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
-        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 7);
-        
-        // There are exactly 3 vertex for a StreamScope oplet
-        int numOplets = 0;
-        for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
-            Oplet<?,?> oplet = v.getInstance();
-            if (oplet instanceof StreamScope) {
-                numOplets++;
-            }
-        }
-        assertEquals(3, numOplets);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentTestSetup.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentTestSetup.java b/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentTestSetup.java
deleted file mode 100644
index 2d34482..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/DevelopmentTestSetup.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev;
-
-import edgent.execution.Job;
-import edgent.execution.Submitter;
-import edgent.providers.development.DevelopmentProvider;
-import edgent.test.topology.TopologyTestSetup;
-import edgent.topology.Topology;
-
-public interface DevelopmentTestSetup extends TopologyTestSetup {
-    @Override
-    default DevelopmentProvider createTopologyProvider() {
-        try {
-            return new DevelopmentProvider();
-        }
-        catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    default Submitter<Topology, Job> createSubmitter() {
-        return (DevelopmentProvider) getTopologyProvider();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java b/providers/development/src/test/java/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
deleted file mode 100644
index 0e5087d..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev.streamscope;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-
-import org.junit.Test;
-
-import com.google.gson.Gson;
-
-import edgent.execution.services.ControlService;
-import edgent.streamscope.StreamScope;
-import edgent.streamscope.StreamScopeRegistry;
-import edgent.streamscope.StreamScope.Sample;
-import edgent.streamscope.mbeans.StreamScopeMXBean;
-import edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
-import edgent.test.providers.dev.DevelopmentTestSetup;
-import edgent.test.streamscope.StreamScopeTest;
-import edgent.topology.Topology;
-
-public class DevelopmentStreamScopeTest extends StreamScopeTest implements DevelopmentTestSetup {
-  
-  @Test
-  public void testServiceRegistered() throws Exception {
-    Topology t1 = newTopology();
-    StreamScopeRegistry rgy1 = t1.getRuntimeServiceSupplier().get()
-        .getService(StreamScopeRegistry.class);
-    assertNotNull(rgy1);
-    
-    Topology t2 = newTopology();
-    StreamScopeRegistry rgy2 = t2.getRuntimeServiceSupplier().get()
-        .getService(StreamScopeRegistry.class);
-    assertNotNull(rgy2);
-    
-    assertSame(rgy1, rgy2);
-  }
-  
-  @Test
-  public void testRegistryControlRegistered() throws Exception {
-    Topology t1 = newTopology();
-    ControlService cs1 = t1.getRuntimeServiceSupplier().get()
-        .getService(ControlService.class);
-    StreamScopeRegistryMXBean rgy1 = cs1.getControl(StreamScopeRegistryMXBean.TYPE,
-        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
-    assertNotNull(rgy1);
-    
-    Topology t2 = newTopology();
-    ControlService cs2 = t2.getRuntimeServiceSupplier().get()
-        .getService(ControlService.class);
-    StreamScopeRegistryMXBean rgy2 = cs2.getControl(StreamScopeRegistryMXBean.TYPE,
-        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
-    assertNotNull(rgy2);
-    
-    // The rgy1, rgy1 mbean instances may or may not be the same object
-    // depending on the ControlService implementation.  For JMXControlService,
-    // each getControl() yields a different MXBeanProxy instance but they are
-    // for the underlying bean (same objectname).
-    //assertSame(rgy1, rgy2);
-  }
-  
-  @Test
-  public void testStreamScopeBeans() throws Exception {
-    testStreamScopeBeans("JOB_1000");
-  }
-  
-  private void testStreamScopeBeans(String jobId) throws Exception {
-    // Development provider should have controls registered.
-    
-    // Get the Rgy and RgyBean
-    Topology t1 = newTopology();
-    StreamScopeRegistry rgy = t1.getRuntimeServiceSupplier().get()
-        .getService(StreamScopeRegistry.class);
-    assertNotNull(rgy);
-    ControlService cs = t1.getRuntimeServiceSupplier().get()
-                          .getService(ControlService.class);
-    StreamScopeRegistryMXBean rgyBean = 
-        cs.getControl(StreamScopeRegistryMXBean.TYPE,
-            StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
-    assertNotNull(rgyBean);
-    
-    // Add a StreamScope and verify it can be located via the controls
-    StreamScope<Integer> ss1 = new StreamScope<Integer>();
-    String streamId = StreamScopeRegistry.mkStreamId(jobId, "OP_1", 2);
-    rgy.register(StreamScopeRegistry.nameForStreamId(streamId), ss1);
-    
-    StreamScopeMXBean ss1Bean = rgyBean.lookup(jobId, "OP_1", 2);
-    assertNotNull(ss1Bean);
-    
-    ss1.setEnabled(true);
-    ss1.accept(100);
-    ss1.accept(101);
-    ss1.accept(102);
-    // access via the bean
-    assertEquals(3, ss1Bean.getSampleCount());
-    String json = ss1Bean.getSamples();
-    assertNotNull(json);
-    
-    Gson gson = new Gson();
-    Sample<?>[] sa = gson.fromJson(json, Sample[].class);
-    for (int i = 0; i < 3; i++) {
-      Sample<?> s = sa[i];
-      Object t = s.tuple(); // fyi, w/o type info fromJson() yields a Double for the numeric
-      assertEquals(t, i+100.0);
-    }
-  }
-  
-  @Test
-  public void testStreamScopeBeans2() throws Exception {
-    // verify successive providers and rgyBean control hackery works
-    testStreamScopeBeans("JOB_1001");
-  }
-
-  // Ideally would test that beans are available via JMX and/or servlet.StreamScopeUtil stuff works
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java b/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
deleted file mode 100644
index 92af032..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev.topology;
-
-import edgent.test.providers.dev.DevelopmentTestSetup;
-import edgent.test.topology.PlumbingTest;
-
-public class DevelopmentPlumbingTest extends PlumbingTest implements DevelopmentTestSetup {
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java b/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
deleted file mode 100644
index f0b5044..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev.topology;
-
-import edgent.test.providers.dev.DevelopmentTestSetup;
-import edgent.test.topology.TStreamTest;
-
-public class DevelopmentTStreamTest extends TStreamTest implements DevelopmentTestSetup {
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java b/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
deleted file mode 100644
index 913fa42..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev.topology;
-
-import edgent.test.providers.dev.DevelopmentTestSetup;
-import edgent.test.topology.TopologyTest;
-
-public class DevelopmentTopologyTest extends TopologyTest implements DevelopmentTestSetup {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentWindowTest.java b/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
deleted file mode 100644
index f2cf8b9..0000000
--- a/providers/development/src/test/java/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.test.providers.dev.topology;
-
-import edgent.test.providers.dev.DevelopmentTestSetup;
-import edgent.test.topology.TWindowTest;
-
-public class DevelopmentWindowTest extends TWindowTest implements DevelopmentTestSetup {
-}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentProviderTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentProviderTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentProviderTest.java
new file mode 100644
index 0000000..18df202
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentProviderTest.java
@@ -0,0 +1,108 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collection;
+
+import org.apache.edgent.graph.Graph;
+import org.apache.edgent.graph.Vertex;
+import org.apache.edgent.metrics.oplets.CounterOp;
+import org.apache.edgent.oplet.Oplet;
+import org.apache.edgent.streamscope.oplets.StreamScope;
+import org.apache.edgent.test.topology.TopologyAbstractTest;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.tester.Condition;
+import org.junit.Test;
+
+public class DevelopmentProviderTest extends TopologyAbstractTest implements DevelopmentTestSetup {
+
+    // DevelopmentProvider inserts CounterOp metric oplets into the graph
+    @Test
+    public void testMetricsEverywhere() throws Exception {
+
+        Topology t = newTopology();
+        TStream<String> s = t.strings("a", "b", "c");
+
+        // Condition inserts a sink
+        Condition<Long> tc = t.getTester().tupleCount(s, 3);
+
+        Graph g = t.graph();
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
+        
+        // Two vertices before submission
+        assertEquals(2, vertices.size());
+
+        complete(t, tc);
+  
+        // At least three vertices after submission
+        // (provide may have added other oplets as well)
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
+        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 3);
+        
+        // There is exactly one vertex for a metric oplet
+        int numOplets = 0;
+        for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
+            Oplet<?,?> oplet = v.getInstance();
+            if (oplet instanceof CounterOp) {
+                numOplets++;
+            }
+        }
+        assertEquals(1, numOplets);
+    }
+
+    // DevelopmentProvider inserts StreamScope oplets into the graph
+    @Test
+    public void testStreamScopesEverywhere() throws Exception {
+
+        Topology t = newTopology();
+        TStream<String> s = t.strings("a", "b", "c");
+        s = s.map(tuple -> tuple)
+            .filter(tuple -> true);
+
+        // Condition inserts a sink
+        Condition<Long> tc = t.getTester().tupleCount(s, 3);
+
+        Graph g = t.graph();
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> vertices = g.getVertices();
+        
+        // Four vertices before submission
+        assertEquals(4, vertices.size());
+
+        complete(t, tc);
+  
+        // At least 4+3 vertices after submission
+        // (provide may have added other oplets as well)
+        Collection<Vertex<? extends Oplet<?, ?>, ?, ?>> verticesAfterSubmit = g.getVertices();
+        assertTrue("size="+verticesAfterSubmit.size(), verticesAfterSubmit.size() >= 7);
+        
+        // There are exactly 3 vertex for a StreamScope oplet
+        int numOplets = 0;
+        for (Vertex<? extends Oplet<?, ?>, ?, ?> v : verticesAfterSubmit) {
+            Oplet<?,?> oplet = v.getInstance();
+            if (oplet instanceof StreamScope) {
+                numOplets++;
+            }
+        }
+        assertEquals(3, numOplets);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentTestSetup.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentTestSetup.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentTestSetup.java
new file mode 100644
index 0000000..d648f2b
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/DevelopmentTestSetup.java
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev;
+
+import org.apache.edgent.execution.Job;
+import org.apache.edgent.execution.Submitter;
+import org.apache.edgent.providers.development.DevelopmentProvider;
+import org.apache.edgent.test.topology.TopologyTestSetup;
+import org.apache.edgent.topology.Topology;
+
+public interface DevelopmentTestSetup extends TopologyTestSetup {
+    @Override
+    default DevelopmentProvider createTopologyProvider() {
+        try {
+            return new DevelopmentProvider();
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    default Submitter<Topology, Job> createSubmitter() {
+        return (DevelopmentProvider) getTopologyProvider();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
new file mode 100644
index 0000000..36d975e
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/streamscope/DevelopmentStreamScopeTest.java
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev.streamscope;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+import org.apache.edgent.execution.services.ControlService;
+import org.apache.edgent.streamscope.StreamScope;
+import org.apache.edgent.streamscope.StreamScopeRegistry;
+import org.apache.edgent.streamscope.StreamScope.Sample;
+import org.apache.edgent.streamscope.mbeans.StreamScopeMXBean;
+import org.apache.edgent.streamscope.mbeans.StreamScopeRegistryMXBean;
+import org.apache.edgent.test.providers.dev.DevelopmentTestSetup;
+import org.apache.edgent.test.streamscope.StreamScopeTest;
+import org.apache.edgent.topology.Topology;
+import org.junit.Test;
+
+import com.google.gson.Gson;
+
+public class DevelopmentStreamScopeTest extends StreamScopeTest implements DevelopmentTestSetup {
+  
+  @Test
+  public void testServiceRegistered() throws Exception {
+    Topology t1 = newTopology();
+    StreamScopeRegistry rgy1 = t1.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy1);
+    
+    Topology t2 = newTopology();
+    StreamScopeRegistry rgy2 = t2.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy2);
+    
+    assertSame(rgy1, rgy2);
+  }
+  
+  @Test
+  public void testRegistryControlRegistered() throws Exception {
+    Topology t1 = newTopology();
+    ControlService cs1 = t1.getRuntimeServiceSupplier().get()
+        .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgy1 = cs1.getControl(StreamScopeRegistryMXBean.TYPE,
+        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgy1);
+    
+    Topology t2 = newTopology();
+    ControlService cs2 = t2.getRuntimeServiceSupplier().get()
+        .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgy2 = cs2.getControl(StreamScopeRegistryMXBean.TYPE,
+        StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgy2);
+    
+    // The rgy1, rgy1 mbean instances may or may not be the same object
+    // depending on the ControlService implementation.  For JMXControlService,
+    // each getControl() yields a different MXBeanProxy instance but they are
+    // for the underlying bean (same objectname).
+    //assertSame(rgy1, rgy2);
+  }
+  
+  @Test
+  public void testStreamScopeBeans() throws Exception {
+    testStreamScopeBeans("JOB_1000");
+  }
+  
+  private void testStreamScopeBeans(String jobId) throws Exception {
+    // Development provider should have controls registered.
+    
+    // Get the Rgy and RgyBean
+    Topology t1 = newTopology();
+    StreamScopeRegistry rgy = t1.getRuntimeServiceSupplier().get()
+        .getService(StreamScopeRegistry.class);
+    assertNotNull(rgy);
+    ControlService cs = t1.getRuntimeServiceSupplier().get()
+                          .getService(ControlService.class);
+    StreamScopeRegistryMXBean rgyBean = 
+        cs.getControl(StreamScopeRegistryMXBean.TYPE,
+            StreamScopeRegistryMXBean.TYPE, StreamScopeRegistryMXBean.class);
+    assertNotNull(rgyBean);
+    
+    // Add a StreamScope and verify it can be located via the controls
+    StreamScope<Integer> ss1 = new StreamScope<Integer>();
+    String streamId = StreamScopeRegistry.mkStreamId(jobId, "OP_1", 2);
+    rgy.register(StreamScopeRegistry.nameForStreamId(streamId), ss1);
+    
+    StreamScopeMXBean ss1Bean = rgyBean.lookup(jobId, "OP_1", 2);
+    assertNotNull(ss1Bean);
+    
+    ss1.setEnabled(true);
+    ss1.accept(100);
+    ss1.accept(101);
+    ss1.accept(102);
+    // access via the bean
+    assertEquals(3, ss1Bean.getSampleCount());
+    String json = ss1Bean.getSamples();
+    assertNotNull(json);
+    
+    Gson gson = new Gson();
+    Sample<?>[] sa = gson.fromJson(json, Sample[].class);
+    for (int i = 0; i < 3; i++) {
+      Sample<?> s = sa[i];
+      Object t = s.tuple(); // fyi, w/o type info fromJson() yields a Double for the numeric
+      assertEquals(t, i+100.0);
+    }
+  }
+  
+  @Test
+  public void testStreamScopeBeans2() throws Exception {
+    // verify successive providers and rgyBean control hackery works
+    testStreamScopeBeans("JOB_1001");
+  }
+
+  // Ideally would test that beans are available via JMX and/or servlet.StreamScopeUtil stuff works
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
new file mode 100644
index 0000000..63ff7bf
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentPlumbingTest.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev.topology;
+
+import org.apache.edgent.test.providers.dev.DevelopmentTestSetup;
+import org.apache.edgent.test.topology.PlumbingTest;
+
+public class DevelopmentPlumbingTest extends PlumbingTest implements DevelopmentTestSetup {
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
new file mode 100644
index 0000000..ef9b9db
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTStreamTest.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev.topology;
+
+import org.apache.edgent.test.providers.dev.DevelopmentTestSetup;
+import org.apache.edgent.test.topology.TStreamTest;
+
+public class DevelopmentTStreamTest extends TStreamTest implements DevelopmentTestSetup {
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
new file mode 100644
index 0000000..182ce56
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentTopologyTest.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev.topology;
+
+import org.apache.edgent.test.providers.dev.DevelopmentTestSetup;
+import org.apache.edgent.test.topology.TopologyTest;
+
+public class DevelopmentTopologyTest extends TopologyTest implements DevelopmentTestSetup {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
----------------------------------------------------------------------
diff --git a/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentWindowTest.java b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
new file mode 100644
index 0000000..4bb9343
--- /dev/null
+++ b/providers/development/src/test/java/org/apache/edgent/test/providers/dev/topology/DevelopmentWindowTest.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.test.providers.dev.topology;
+
+import org.apache.edgent.test.providers.dev.DevelopmentTestSetup;
+import org.apache.edgent.test.topology.TWindowTest;
+
+public class DevelopmentWindowTest extends TWindowTest implements DevelopmentTestSetup {
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/direct/build.gradle
----------------------------------------------------------------------
diff --git a/providers/direct/build.gradle b/providers/direct/build.gradle
index bf50af5..7c2da58 100644
--- a/providers/direct/build.gradle
+++ b/providers/direct/build.gradle
@@ -15,6 +15,7 @@ dependencies {
   compile project(':api:topology')
   compile project(':spi:topology')
   compile project(':spi:graph')
+  compile project(':runtime:appservice')
   compile project(':runtime:etiao')
   compile project(':runtime:jsoncontrol')
   compile ext_classpath

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/a129aa16/providers/direct/src/main/java/edgent/providers/direct/DirectProvider.java
----------------------------------------------------------------------
diff --git a/providers/direct/src/main/java/edgent/providers/direct/DirectProvider.java b/providers/direct/src/main/java/edgent/providers/direct/DirectProvider.java
deleted file mode 100644
index cbfdc03..0000000
--- a/providers/direct/src/main/java/edgent/providers/direct/DirectProvider.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package edgent.providers.direct;
-
-import java.util.concurrent.Future;
-
-import com.google.gson.JsonObject;
-
-import edgent.execution.DirectSubmitter;
-import edgent.execution.Job;
-import edgent.execution.services.ControlService;
-import edgent.execution.services.ServiceContainer;
-import edgent.runtime.jsoncontrol.JsonControlService;
-import edgent.topology.Topology;
-import edgent.topology.TopologyProvider;
-import edgent.topology.spi.AbstractTopologyProvider;
-
-/**
- * {@code DirectProvider} is a {@link TopologyProvider} that
- * runs a submitted topology as a {@link Job} in threads
- * in the current virtual machine.
- * <P> 
- * A job (execution of a topology) continues to execute
- * while any of its elements have remaining work,
- * such as any of the topology's source streams are capable
- * of generating tuples.
- * <BR>
- * "Endless" source streams never terminate - e.g., a stream
- * created by {@link Topology#generate(edgent.function.Supplier) generate()},
- * {@link Topology#poll(edgent.function.Supplier, long, java.util.concurrent.TimeUnit) poll()},
- * or {@link Topology#events(edgent.function.Consumer) events()}.
- * Hence a job with such sources runs until either it or some other
- * entity terminates it.
- * </P>
- */
-public class DirectProvider extends AbstractTopologyProvider<DirectTopology>
-        implements DirectSubmitter<Topology, Job> {
-
-    private final ServiceContainer services;
-    
-    public DirectProvider() {
-        this.services = new ServiceContainer();
-        
-        getServices().addService(ControlService.class, new JsonControlService());
-    }
-
-    /**
-     * {@inheritDoc}
-     * <P>
-     * The returned services instance is shared
-     * across all jobs submitted to this provider. 
-     * </P>
-     */
-    @Override
-    public ServiceContainer getServices() {
-        return services;
-    }
-
-    @Override
-    public DirectTopology newTopology(String name) {
-        return new DirectTopology(name, services);
-    }
-
-    @Override
-    public Future<Job> submit(Topology topology) {
-        return submit(topology, new JsonObject());
-    }
-    
-    @Override
-    public Future<Job> submit(Topology topology, JsonObject config) {
-        return ((DirectTopology) topology).executeCallable(config);
-    }
-}



Mime
View raw message