Return-Path: X-Original-To: apmail-apex-commits-archive@minotaur.apache.org Delivered-To: apmail-apex-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7B471728D for ; Wed, 9 Sep 2015 22:12:52 +0000 (UTC) Received: (qmail 18811 invoked by uid 500); 9 Sep 2015 22:12:52 -0000 Delivered-To: apmail-apex-commits-archive@apex.apache.org Received: (qmail 18773 invoked by uid 500); 9 Sep 2015 22:12:52 -0000 Mailing-List: contact commits-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list commits@apex.incubator.apache.org Received: (qmail 18764 invoked by uid 99); 9 Sep 2015 22:12:52 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 48CC918096B for ; Wed, 9 Sep 2015 22:12:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.775 X-Spam-Level: * X-Spam-Status: No, score=1.775 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.006, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id w7aaWnnxrlGv for ; Wed, 9 Sep 2015 22:12:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id B34F644570 for ; Wed, 9 Sep 2015 22:12:33 +0000 (UTC) Received: (qmail 15148 invoked by uid 99); 9 Sep 2015 22:12:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Sep 2015 22:12:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1B5FEE0A03; Wed, 9 Sep 2015 22:12:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chetan@apache.org To: commits@apex.incubator.apache.org Date: Wed, 09 Sep 2015 22:12:40 -0000 Message-Id: <773de725aa364561b399ec569fbf6050@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/50] incubator-apex-core git commit: Schema Support Schema Support Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/61929b58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/61929b58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/61929b58 Branch: refs/heads/master Commit: 61929b58f9dbf32c281c845197445e728fffa866 Parents: 66a75e0 Author: Chandni Singh Authored: Sun Aug 2 13:29:16 2015 -0700 Committer: Chandni Singh Committed: Wed Aug 5 13:13:05 2015 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 7 + .../annotation/InputPortFieldAnnotation.java | 10 +- .../annotation/OutputPortFieldAnnotation.java | 10 + .../java/com/datatorrent/stram/cli/DTCli.java | 15 +- .../stram/plan/logical/LogicalPlan.java | 15 ++ .../plan/logical/LogicalPlanConfiguration.java | 26 +- .../stram/webapp/OperatorDiscoverer.java | 235 +++++++++++++------ .../com/datatorrent/stram/webapp/TypeGraph.java | 28 +++ .../plan/LogicalPlanConfigurationTest.java | 65 ++++- .../stram/plan/SchemaTestOperator.java | 33 +++ .../stram/webapp/OperatorDiscoveryTest.java | 64 ++++- .../src/test/resources/schemaTestTopology.json | 43 ++++ 12 files changed, 473 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 1417389..249cecd 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -151,6 +151,13 @@ public interface Context * a generic codec. */ Attribute> STREAM_CODEC = new Attribute>(new Object2String>()); + + /** + * Provides the tuple class which the port receives or emits. While this attribute is null by default, + * whether it is needed or not is controlled through the port annotation. + */ + Attribute> TUPLE_CLASS = new Attribute<>(new Object2String>()); + @SuppressWarnings("FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java index 965eab3..2734bf6 100644 --- a/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java +++ b/api/src/main/java/com/datatorrent/api/annotation/InputPortFieldAnnotation.java @@ -16,8 +16,8 @@ package com.datatorrent.api.annotation; import java.lang.annotation.*; + /** - * * Annotation for input ports on streaming operators.

* * @since 0.3.2 @@ -33,4 +33,12 @@ public @interface InputPortFieldAnnotation * @return - true if port is optional, false otherwise. */ public boolean optional() default false; + + /** + * Whether this port needs to know the tuple class. When true, application will have to set + * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail. + * + * @return true if schema is required; false otherwise. + */ + public boolean schemaRequired() default false; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java index 154c1df..bb585c6 100644 --- a/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java +++ b/api/src/main/java/com/datatorrent/api/annotation/OutputPortFieldAnnotation.java @@ -21,6 +21,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import com.datatorrent.api.Context; /** * @@ -40,4 +41,13 @@ public @interface OutputPortFieldAnnotation { *

error.

*/ public boolean error() default false; + + /** + * Whether this port needs to know the tuple class. When true, application will have to set + * the port attribute- TUPLE_CLASS of the port otherwise dag validation will fail. + * + * @return true if schema is required; false otherwise. + */ + public boolean schemaRequired() default false; } + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java index 936ba25..eff2404 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/DTCli.java @@ -3012,6 +3012,8 @@ public class DTCli JSONObject portClassHier = new JSONObject(); JSONObject failed = new JSONObject(); + JSONObject portTypesWithSchemaClasses = new JSONObject(); + for (Class clazz : operatorClasses) { try { JSONObject oper = operatorDiscoverer.describeOperator(clazz); @@ -3021,8 +3023,15 @@ public class DTCli String s = defaultValueMapper.writeValueAsString(operIns); oper.put("defaultValue", new JSONObject(s).get(clazz.getName())); - // add class hier info to portClassHier - operatorDiscoverer.buildPortClassHier(oper, portClassHier); + // add class hierarchy info to portClassHier and fetch port types with schema classes + operatorDiscoverer.buildAdditionalPortInfo(oper, portClassHier, portTypesWithSchemaClasses); + + Iterator portTypesIter = portTypesWithSchemaClasses.keys(); + while (portTypesIter.hasNext()) { + if (!portTypesWithSchemaClasses.getBoolean((String) portTypesIter.next())) { + portTypesIter.remove(); + } + } arr.put(oper); } catch (Exception | NoClassDefFoundError ex) { @@ -3031,8 +3040,10 @@ public class DTCli failed.put(cls, ex.toString()); } } + json.put("operatorClasses", arr); json.put("portClassHier", portClassHier); + json.put("portTypesWithSchemaClasses", portTypesWithSchemaClasses); if (failed.length() > 0) { json.put("failedOperators", failed); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index b1e7d94..fc182cd 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -1170,6 +1170,13 @@ public class LogicalPlan implements Serializable, DAG validateThreadLocal(n); } } + + if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) { + //since schema is required, the port attribute TUPLE_CLASS should be present + if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) { + throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName()); + } + } } } @@ -1179,6 +1186,14 @@ public class LogicalPlan implements Serializable, DAG if (pm.portAnnotation != null && !pm.portAnnotation.optional()) { throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName()); } + } else { + //port is connected + if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) { + //since schema is required, the port attribute TUPLE_CLASS should be present + if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) { + throw new ValidationException("Attribute " + PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName()); + } + } } allPortsOptional &= (pm.portAnnotation != null && pm.portAnnotation.optional()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 3e3326b..d838a2d 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -77,6 +77,7 @@ public class LogicalPlanConfiguration { public static final String STREAM_SINKS = "sinks"; public static final String STREAM_TEMPLATE = "template"; public static final String STREAM_LOCALITY = "locality"; + public static final String STREAM_SCHEMA = "schema"; public static final String OPERATOR_PREFIX = StreamingApplication.DT_PREFIX + "operator."; public static final String OPERATOR_CLASSNAME = "classname"; @@ -908,6 +909,11 @@ public class LogicalPlanConfiguration { if (locality != null) { prop.setProperty(streamPrefix + STREAM_LOCALITY, locality); } + JSONObject schema = stream.optJSONObject("schema"); + if (schema != null) { + String schemaClass = schema.getString("class"); + prop.setProperty(streamPrefix + STREAM_SCHEMA, schemaClass); + } } return addFromProperties(prop, conf); } @@ -1126,6 +1132,16 @@ public class LogicalPlanConfiguration { DAG.StreamMeta sd = dag.addStream(streamConfEntry.getKey()); sd.setLocality(streamConf.getLocality()); + String schemaClassName = streamConf.properties.getProperty(STREAM_SCHEMA); + Class schemaClass = null; + if (schemaClassName != null) { + try { + schemaClass = Class.forName(schemaClassName); + } catch (ClassNotFoundException e) { + throw new ValidationException("schema class not found: " + schemaClassName); + } + } + if (streamConf.sourceNode != null) { String portName = null; for (Map.Entry e : streamConf.sourceNode.outputs.entrySet()) { @@ -1137,6 +1153,10 @@ public class LogicalPlanConfiguration { Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor(); Operators.describe(sourceDecl, sourcePortMap); sd.setSource(sourcePortMap.outputPorts.get(portName).component); + + if (schemaClass != null) { + dag.setOutputPortAttribute(sourcePortMap.outputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass); + } } for (OperatorConf targetNode : streamConf.targetNodes) { @@ -1150,6 +1170,10 @@ public class LogicalPlanConfiguration { Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor(); Operators.describe(targetDecl, targetPortMap); sd.addSink(targetPortMap.inputPorts.get(portName).component); + + if (schemaClass != null) { + dag.setInputPortAttribute(targetPortMap.inputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass); + } } } @@ -1164,7 +1188,7 @@ public class LogicalPlanConfiguration { */ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name) { - // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attt below is used + // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName()); dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null? conf.get(GATEWAY_LISTEN_ADDRESS): connectAddress); if (app != null) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java index 60e35da..004c100 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/OperatorDiscoverer.java @@ -22,8 +22,10 @@ import com.datatorrent.stram.webapp.TypeDiscoverer.UI_TYPE; import com.datatorrent.stram.webapp.asm.CompactAnnotationNode; import com.datatorrent.stram.webapp.asm.CompactFieldNode; import com.google.common.base.Predicate; +import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.beans.*; @@ -44,7 +46,6 @@ import javax.xml.parsers.*; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.text.WordUtils; import org.codehaus.jettison.json.*; -import org.apache.xbean.asm5.tree.AnnotationNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.Attributes; @@ -60,6 +61,8 @@ import org.xml.sax.helpers.DefaultHandler; */ public class OperatorDiscoverer { + public static final String GENERATED_CLASSES_JAR = "_generated-classes.jar"; + private static class ClassComparator implements Comparator> { @Override @@ -73,21 +76,34 @@ public class OperatorDiscoverer private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoverer.class); private final List pathsToScan = new ArrayList(); private final ClassLoader classLoader; - private final String dtOperatorDoclinkPrefix = "https://www.datatorrent.com/docs/apidocs/index.html"; + private static final String DT_OPERATOR_DOCLINK_PREFIX = "https://www.datatorrent.com/docs/apidocs/index.html"; public static final String PORT_TYPE_INFO_KEY = "portTypeInfo"; private final TypeGraph typeGraph = TypeGraphFactory.createTypeGraphProtoType(); + private static final String USE_SCHEMA_TAG = "@useSchema"; + private static final String DESCRIPTION_TAG = "@description"; + private static final Pattern WHITESPACE_PATTERN = Pattern.compile("\\s+?"); + + private static final String SCHEMA_REQUIRED_KEY = "schemaRequired"; + private final Map classInfo = new HashMap(); private static class OperatorClassInfo { String comment; final Map tags = new HashMap(); - final Map getMethods = new HashMap(); - final Map setMethods = new HashMap(); + final Map getMethods = Maps.newHashMap(); + final Map setMethods = Maps.newHashMap(); final Set invisibleGetSetMethods = new HashSet(); final Map fields = new HashMap(); } + private static class MethodInfo + { + Map descriptions = Maps.newHashMap(); + Map useSchemas = Maps.newHashMap(); + String comment; + } + private class JavadocSAXHandler extends DefaultHandler { private String className = null; @@ -122,11 +138,19 @@ public class OperatorDiscoverer else if (qName.equalsIgnoreCase("tag")) { if (oci != null) { String tagName = attributes.getValue("name"); - String tagText = attributes.getValue("text"); + String tagText = attributes.getValue("text").trim(); if (methodName != null) { - if("@omitFromUI".equals(tagName) && (isGetter(methodName) || isSetter(methodName))) - { - oci.invisibleGetSetMethods.add(methodName); + boolean lGetterCheck = isGetter(methodName); + boolean lSetterCheck = !lGetterCheck && isSetter(methodName); + + if (lGetterCheck || lSetterCheck) { + if ("@omitFromUI".equals(tagName)) { + oci.invisibleGetSetMethods.add(methodName); + } else if (DESCRIPTION_TAG.equals(tagName)) { + addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, true); + } else if (USE_SCHEMA_TAG.equals(tagName)) { + addTagToMethod(lGetterCheck ? oci.getMethods : oci.setMethods, tagText, false); + } } // if ("@return".equals(tagName) && isGetter(methodName)) { // oci.getMethods.put(methodName, tagText); @@ -149,6 +173,24 @@ public class OperatorDiscoverer } } + private void addTagToMethod(Map methods, String tagText, boolean isDescription) + { + MethodInfo mi = methods.get(methodName); + if (mi == null) { + mi = new MethodInfo(); + methods.put(methodName, mi); + } + String[] tagParts = Iterables.toArray(Splitter.on(WHITESPACE_PATTERN).trimResults().omitEmptyStrings(). + limit(2).split(tagText), String.class); + if (tagParts.length == 2) { + if (isDescription) { + mi.descriptions.put(tagParts[0], tagParts[1]); + } else { + mi.useSchemas.put(tagParts[0], tagParts[1]); + } + } + } + @Override public void endElement(String uri, String localName, String qName) throws SAXException { if (qName.equalsIgnoreCase("class")) { @@ -160,9 +202,19 @@ public class OperatorDiscoverer if (methodName != null) { // do nothing if (isGetter(methodName)) { - oci.getMethods.put(methodName, comment.toString()); + MethodInfo mi = oci.getMethods.get(methodName); + if (mi == null) { + mi = new MethodInfo(); + oci.getMethods.put(methodName, mi); + } + mi.comment = comment.toString(); } else if (isSetter(methodName)) { - oci.setMethods.put(methodName, comment.toString()); + MethodInfo mi = oci.setMethods.get(methodName); + if (mi == null) { + mi = new MethodInfo(); + oci.setMethods.put(methodName, mi); + } + mi.comment = comment.toString(); } } else if (fieldName != null) { @@ -236,7 +288,7 @@ public class OperatorDiscoverer { Map openJarFiles = new HashMap(); Map openClassFiles = new HashMap(); - try { + try { for (String path : pathsToScan) { File f = null; try { @@ -244,6 +296,9 @@ public class OperatorDiscoverer if (!f.exists() || f.isDirectory() || (!f.getName().endsWith("jar") && !f.getName().endsWith("class"))) { continue; } + if (GENERATED_CLASSES_JAR.equals(f.getName())) { + continue; + } if (f.getName().endsWith("class")) { typeGraph.addNode(f); openClassFiles.put(path, f); @@ -410,6 +465,9 @@ public class OperatorDiscoverer if (!inputPort.has("optional")) { inputPort.put("optional", false); // input port that is not annotated is default to be not optional } + if (!inputPort.has(SCHEMA_REQUIRED_KEY)) { + inputPort.put(SCHEMA_REQUIRED_KEY, false); + } inputPorts.put(inputPort); } @@ -422,6 +480,9 @@ public class OperatorDiscoverer if (!outputPort.has("error")) { outputPort.put("error", false); } + if (!outputPort.has(SCHEMA_REQUIRED_KEY)) { + outputPort.put(SCHEMA_REQUIRED_KEY, false); + } outputPorts.put(outputPort); } @@ -471,7 +532,7 @@ public class OperatorDiscoverer } else if (clazz.getName().startsWith("com.datatorrent.lib.") || clazz.getName().startsWith("com.datatorrent.contrib.")) { - response.put("doclink", dtOperatorDoclinkPrefix + "?" + getDocName(clazz)); + response.put("doclink", DT_OPERATOR_DOCLINK_PREFIX + "?" + getDocName(clazz)); } } } @@ -531,10 +592,10 @@ public class OperatorDiscoverer if (oci.invisibleGetSetMethods.contains(getPrefix + propName) || oci.invisibleGetSetMethods.contains(setPrefix + propName)) { continue; } - String desc = oci.setMethods.get(setPrefix + propName); - desc = desc == null ? oci.getMethods.get(getPrefix + propName) : desc; - if (desc != null) { - propJ.put("description", desc); + MethodInfo methodInfo = oci.setMethods.get(setPrefix + propName); + methodInfo = methodInfo == null ? oci.getMethods.get(getPrefix + propName) : methodInfo; + if (methodInfo != null) { + addTagsToProperties(methodInfo, propJ); } result.put(propJ); } @@ -553,6 +614,32 @@ public class OperatorDiscoverer } } + private void addTagsToProperties(MethodInfo mi, JSONObject propJ) throws JSONException + { + //create description object. description tag enables the visual tools to display description of keys/values + //of a map property, items of a list, properties within a complex type. + JSONObject descriptionObj = new JSONObject(); + if (mi.comment != null) { + descriptionObj.put("$", mi.comment); + } + for (Map.Entry descEntry : mi.descriptions.entrySet()) { + descriptionObj.put(descEntry.getKey(), descEntry.getValue()); + } + if (descriptionObj.length() > 0) { + propJ.put("descriptions", descriptionObj); + } + + //create useSchema object. useSchema tag is added to enable visual tools to be able to render a text field + //as a dropdown with choices populated from the schema attached to the port. + JSONObject useSchemaObj = new JSONObject(); + for (Map.Entry useSchemaEntry : mi.useSchemas.entrySet()) { + useSchemaObj.put(useSchemaEntry.getKey(), useSchemaEntry.getValue()); + } + if (useSchemaObj.length() > 0) { + propJ.put("useSchema", useSchemaObj); + } + } + public JSONObject describeClass(String clazzName) throws Exception { return describeClassByASM(clazzName); @@ -626,9 +713,9 @@ public class OperatorDiscoverer for (Class c = clazz; c != null; c = c.getSuperclass()) { OperatorClassInfo oci = classInfo.get(c.getName()); if (oci != null) { - String getMethodDesc = oci.getMethods.get(readMethod.getName()); - if (getMethodDesc != null) { - propertyObj.put("description", oci.getMethods.get(readMethod.getName())); + MethodInfo getMethodInfo = oci.getMethods.get(readMethod.getName()); + if (getMethodInfo != null) { + addTagsToProperties(getMethodInfo, propertyObj); break; } } @@ -673,74 +760,79 @@ public class OperatorDiscoverer /** * Enrich portClassHier with class/interface names that map to a list of parent classes/interfaces. - * For any class encountered, find its parents too. + * For any class encountered, find its parents too.
+ * Also find the port types which have assignable schema classes. * - * @param oper Operator to work on - * @param portClassHier In-Out param that contains a mapping of class/interface to its parents + * @param oper Operator to work on + * @param portClassHierarchy In-Out param that contains a mapping of class/interface to its parents + * @param portTypesWithSchemaClasses Json that will contain all the ports which have any schema classes. */ - public void buildPortClassHier(JSONObject oper, JSONObject portClassHier) { + public void buildAdditionalPortInfo(JSONObject oper, JSONObject portClassHierarchy, JSONObject portTypesWithSchemaClasses) + { try { JSONArray ports = oper.getJSONArray(OperatorDiscoverer.PORT_TYPE_INFO_KEY); - int num_ports = ports.length(); - for (int i = 0; i < num_ports; i++) { + for (int i = 0; i < ports.length(); i++) { JSONObject port = ports.getJSONObject(i); - String type; - try { - type = port.getString("type"); - } catch (JSONException e) { - // no type key + String portType = port.optString("type"); + if (portType == null) { + //skipping if port type is null continue; } - try { - // load the port type class - Class portClazz = classLoader.loadClass(type.replaceAll("\\bclass ", "").replaceAll("\\binterface ", "")); - - // iterate up the class hierarchy to populate the portClassHier map - while (portClazz != null) { - ArrayList parents = new ArrayList(); + if (typeGraph.size() == 0) { + buildTypeGraph(); + } - String portClazzName = portClazz.toString(); - if (portClassHier.has(portClazzName)) { - // already present in portClassHier, so we can stop - break; + try { + //building port class hierarchy + LinkedList queue = Lists.newLinkedList(); + queue.add(portType); + while (!queue.isEmpty()) { + String currentType = queue.remove(); + if (portClassHierarchy.has(currentType)) { + //already present in the json so we skip. + continue; } - - // interfaces and Object are at the top of the tree, so we can just put them - // in portClassHier with empty parents, then move on. - if (portClazz.isInterface() || portClazzName.equals("java.lang.Object")) { - portClassHier.put(portClazzName, parents); - break; + List immediateParents = typeGraph.getParents(currentType); + if (immediateParents == null) { + portClassHierarchy.put(currentType, Lists.newArrayList()); + continue; } + portClassHierarchy.put(currentType, immediateParents); + queue.addAll(immediateParents); + } + } catch (JSONException e) { + LOG.warn("building port type hierarchy {}", portType, e); + } - // look at superclass first - Class superClazz = portClazz.getSuperclass(); - try { - String superClazzName = superClazz.toString(); - parents.add(superClazzName); - } catch (NullPointerException e) { - LOG.info("Superclass is null for `{}` ({})", portClazz, superClazz); - } - // then look at interfaces implemented in this port - for (Class intf : portClazz.getInterfaces()) { - String intfName = intf.toString(); - if (!portClassHier.has(intfName)) { - // add the interface to portClassHier - portClassHier.put(intfName, new ArrayList()); - } - parents.add(intfName); + //finding port types with schema classes + if (portTypesWithSchemaClasses.has(portType)) { + //already present in the json so skipping + continue; + } + if (portType.equals("byte") || portType.equals("short") || portType.equals("char") || portType.equals("int") + || portType.equals("long") || portType.equals("float") || portType.equals("double") + || portType.equals("java.lang.String") || portType.equals("java.lang.Object")) { + //ignoring primitives, strings and object types as this information is needed only for complex types. + continue; + } + if (port.has("typeArgs")) { + //ignoring any type with generics + continue; + } + boolean hasSchemaClasses = false; + for (String descendant : typeGraph.getInstantiableDescendants(portType)) { + try { + if (typeGraph.isInstantiableBean(descendant)) { + hasSchemaClasses = true; + break; } - - // now store class=>parents mapping in portClassHier - portClassHier.put(portClazzName, parents); - - // walk up the hierarchy for the next iteration - portClazz = superClazz; + } catch (JSONException ex) { + LOG.warn("checking descendant is instantiable {}", descendant); } - } catch (ClassNotFoundException e) { - LOG.info("Could not make class from `{}`", type); } + portTypesWithSchemaClasses.put(portType, hasSchemaClasses); } } catch (JSONException e) { // should not reach this @@ -763,5 +855,4 @@ public class OperatorDiscoverer return typeGraph; } - } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java index b06bb76..61dc99d 100644 --- a/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java +++ b/engine/src/main/java/com/datatorrent/stram/webapp/TypeGraph.java @@ -1152,4 +1152,32 @@ public class TypeGraph return result; } + /** + * A utility method that tells whether a class is considered a bean.
+ * For simplicity we exclude classes that have any type-args. + * + * @param className name of the class + * @return true if it is a bean false otherwise. + */ + public boolean isInstantiableBean(String className) throws JSONException + { + JSONObject classDesc = describeClass(className); + if (classDesc.has("typeArgs")) { + //any type with generics is not considered a bean + return false; + } + JSONArray classProps = classDesc.optJSONArray("properties"); + if (classProps == null || classProps.length() == 0) { + //no properties then cannot be a bean + return false; + } + for (int p = 0; p < classProps.length(); p++) { + JSONObject propDesc = classProps.getJSONObject(p); + if (propDesc.optBoolean("canGet", false)) { + return true; + } + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java index c46fb5b..af12575 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java @@ -21,6 +21,8 @@ import java.io.StringWriter; import java.lang.reflect.Field; import java.util.*; +import javax.validation.ValidationException; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -682,6 +684,63 @@ public class LogicalPlanConfigurationTest { } } + @Test + public void testTupleClassAttr() throws Exception + { + String resourcePath = "/schemaTestTopology.json"; + InputStream is = this.getClass().getResourceAsStream(resourcePath); + if (is == null) { + fail("Could not load " + resourcePath); + } + StringWriter writer = new StringWriter(); + + IOUtils.copy(is, writer); + JSONObject json = new JSONObject(writer.toString()); + + Configuration conf = new Configuration(false); + + LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); + LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); + dag.validate(); + + OperatorMeta operator1 = dag.getOperatorMeta("operator1"); + assertEquals("operator1.classname", SchemaTestOperator.class, operator1.getOperator().getClass()); + + StreamMeta input1 = dag.getStream("inputStream"); + assertNotNull(input1); + for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) { + Assert.assertEquals("tuple class name required", TestSchema.class, targetPort.getAttributes().get(PortContext.TUPLE_CLASS)); + } + } + + @Test + public void testTupleClassAttrValidation() throws Exception + { + String resourcePath = "/schemaTestTopology.json"; + InputStream is = this.getClass().getResourceAsStream(resourcePath); + if (is == null) { + fail("Could not load " + resourcePath); + } + StringWriter writer = new StringWriter(); + + IOUtils.copy(is, writer); + JSONObject json = new JSONObject(writer.toString()); + + //removing schema so that validation fails + json.getJSONArray("streams").getJSONObject(0).remove("schema"); + Configuration conf = new Configuration(false); + + LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf); + LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson"); + + try { + dag.validate(); + Assert.fail(); + } catch (ValidationException ve) { + //test pass as validation exception was thrown. + } + } + private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class); public static class TestApplication implements StreamingApplication { @@ -789,7 +848,11 @@ public class LogicalPlanConfigurationTest { return false; return true; } - + + } + + public static class TestSchema + { } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java new file mode 100644 index 0000000..59aaade --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/plan/SchemaTestOperator.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.stram.plan; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +import com.datatorrent.stram.engine.GenericTestOperator; + +public class SchemaTestOperator extends GenericTestOperator +{ + @InputPortFieldAnnotation(schemaRequired = true) + final public transient InputPort schemaRequiredPort = new DefaultInputPort() + { + @Override + final public void process(Object payload) + { + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java index 8f8b632..ad915c8 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java @@ -41,6 +41,8 @@ import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.InputOperator; + import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration; @@ -170,7 +172,7 @@ public class OperatorDiscoveryTest OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath); operatorDiscoverer.buildTypeGraph(); - // make sure (de)serialization of type graph works withtout problem + // make sure (de)serialization of type graph works without problem Kryo kryo = new Kryo(); TypeGraph.TypeGraphSerializer tgs = new TypeGraph.TypeGraphSerializer(); kryo.register(TypeGraph.class, tgs); @@ -1033,4 +1035,64 @@ public class OperatorDiscoveryTest } + public static class SchemaRequiredOperator extends BaseOperator implements InputOperator + { + @OutputPortFieldAnnotation(schemaRequired = true) + public final transient DefaultOutputPort output = new DefaultOutputPort(); + + @OutputPortFieldAnnotation(schemaRequired = false) + public final transient DefaultOutputPort output1 = new DefaultOutputPort(); + + public final transient DefaultOutputPort output2 = new DefaultOutputPort(); + + @Override + public void emitTuples() + { + } + } + + @Test + public void testPortSchema() throws Exception + { + String[] classFilePath = getClassFileInClasspath(); + OperatorDiscoverer od = new OperatorDiscoverer(classFilePath); + od.buildTypeGraph(); + JSONObject operatorJson = od.describeOperator(SchemaRequiredOperator.class); + JSONArray portsJson = operatorJson.getJSONArray("outputPorts"); + + Assert.assertEquals("no. of ports", 3, portsJson.length()); + + for (int i = 0; i < portsJson.length(); i++) { + JSONObject portJson = portsJson.getJSONObject(i); + String name = portJson.getString("name"); + if (name.equals("output")) { + Assert.assertEquals("output schema", true, portJson.getBoolean("schemaRequired")); + } else if (name.equals("output1")) { + Assert.assertEquals("output1 schema", false, portJson.getBoolean("schemaRequired")); + } else if (name.equals("output2")) { + Assert.assertEquals("output2 schema", false, portJson.getBoolean("schemaRequired")); + } + } + } + + @Test + public void testAdditionalPortInfo() throws Exception + { + String[] classFilePath = getClassFileInClasspath(); + OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(classFilePath); + operatorDiscoverer.buildTypeGraph(); + JSONObject operator = operatorDiscoverer.describeOperator(SubSubClassGeneric.class); + + JSONObject portClassHierarchy = new JSONObject(); + JSONObject portsWithSchemaClasses = new JSONObject(); + operatorDiscoverer.buildAdditionalPortInfo(operator, portClassHierarchy, portsWithSchemaClasses); + + JSONArray stringTypeArray = portClassHierarchy.optJSONArray("java.lang.String"); + Assert.assertNotNull("string hierarchy", stringTypeArray); + + Assert.assertEquals("number of immediate ancestors", 4, stringTypeArray.length()); + + Assert.assertEquals("number of port types with schema", 0, portsWithSchemaClasses.length()); + } } + http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61929b58/engine/src/test/resources/schemaTestTopology.json ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/schemaTestTopology.json b/engine/src/test/resources/schemaTestTopology.json new file mode 100644 index 0000000..6c779fd --- /dev/null +++ b/engine/src/test/resources/schemaTestTopology.json @@ -0,0 +1,43 @@ +{ + "operators": [ + { + "name": "inputOperator", + "class": "com.datatorrent.stram.engine.TestGeneratorInputOperator", + "properties": { + "com.datatorrent.stram.engine.TestGeneratorInputOperator": { + "myConfigProperty": "myConfigPropertyValue" + } + }, + "ports": [ + { + "name": "outport", + "attributes": { + "UNIFIER_LIMIT": 8 + } + } + ] + }, + { + "name": "operator1", + "class": "com.datatorrent.stram.plan.SchemaTestOperator" + } + ], + "streams": [ + { + "name": "inputStream", + "source": { + "operatorName": "inputOperator", + "portName": "outport" + }, + "sinks": [ + { + "operatorName": "operator1", + "portName": "schemaRequiredPort" + } + ], + "schema": { + "class": "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema" + } + } + ] +}