asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [01/16] asterixdb git commit: Add Asterix Extension Manager
Date Sat, 20 Aug 2016 06:15:43 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master a4815d353 -> ab81748ab


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java
index 4034d77..a1a89fe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java
@@ -18,15 +18,16 @@
  */
 package org.apache.asterix.om.base;
 
-import org.json.JSONException;
-import org.json.JSONObject;
+import java.io.Serializable;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.visitors.IOMVisitor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
-public class AString implements IAObject {
+public class AString implements IAObject, Serializable {
 
     protected String value;
 
@@ -50,8 +51,9 @@ public class AString implements IAObject {
 
     @Override
     public boolean equals(Object obj) {
-        if (!(obj instanceof AString))
+        if (!(obj instanceof AString)) {
             return false;
+        }
         return value.equals(((AString) obj).getStringValue());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
index 248ec3a..fc1108f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java
@@ -44,8 +44,8 @@ import org.json.JSONObject;
  */
 public class ARecordType extends AbstractComplexType {
 
-    public static final ARecordType FULLY_OPEN_RECORD_TYPE =
-            new ARecordType("OpenRecord", new String[0], new IAType[0], true);
+    public static final ARecordType FULLY_OPEN_RECORD_TYPE = new ARecordType("OpenRecord", new String[0], new IAType[0],
+            true);
 
     private static final long serialVersionUID = 1L;
     private final String[] fieldNames;
@@ -369,4 +369,14 @@ public class ARecordType extends AbstractComplexType {
         }
         return false;
     }
+
+    /**
+     * Create a fully open record type with the passed name
+     *
+     * @param name
+     * @return
+     */
+    public static ARecordType createOpenRecordType(String name) {
+        return new ARecordType(name, new String[0], new IAType[0], true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index bb1e554..bf103fc 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -18,11 +18,13 @@
  */
 package org.apache.asterix.om.util;
 
+import java.io.IOException;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
+import org.apache.asterix.common.config.AsterixExtensionProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixFeedProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
@@ -59,12 +61,15 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
     private AsterixFeedProperties feedProperties;
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
+    private AsterixExtensionProperties extensionProperties;
     private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
     private final ILibraryManager libraryManager;
+    private Object extensionManager;
 
     public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
-            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) throws AsterixException {
+            IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager)
+            throws AsterixException, IOException {
         if (INSTANCE != null) {
             return;
         }
@@ -86,8 +91,9 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
         INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor);
         INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor);
         INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor);
-        INSTANCE.replicationProperties = new AsterixReplicationProperties(propertiesAccessor,
-                AsterixClusterProperties.INSTANCE.getCluster());
+        INSTANCE.extensionProperties = new AsterixExtensionProperties(propertiesAccessor);
+        INSTANCE.replicationProperties =
+                new AsterixReplicationProperties(propertiesAccessor, AsterixClusterProperties.INSTANCE.getCluster());
         INSTANCE.hcc = hcc;
         INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor);
         Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
@@ -173,4 +179,16 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
     public ILibraryManager getLibraryManager() {
         return libraryManager;
     }
+
+    public Object getExtensionManager() {
+        return extensionManager;
+    }
+
+    public void setExtensionManager(Object extensionManager) {
+        this.extensionManager = extensionManager;
+    }
+
+    public AsterixExtensionProperties getExtensionProperties() {
+        return extensionProperties;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
new file mode 100644
index 0000000..2d23058
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.job.listener;
+
+import java.util.List;
+
+import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.hyracks.api.context.IHyracksJobletContext;
+import org.apache.hyracks.api.job.IJobletEventListener;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+import org.apache.hyracks.api.job.JobStatus;
+
+/**
+ * This Joblet enable transactions on multiple datasets to take place in the same Hyracks Job
+ * It takes a list of Transaction job ids instead of a single job Id
+ */
+public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final List<JobId> jobIds;
+    private final boolean transactionalWrite;
+
+    public MultiTransactionJobletEventListenerFactory(List<JobId> jobIds, boolean transactionalWrite) {
+        this.jobIds = jobIds;
+        this.transactionalWrite = transactionalWrite;
+    }
+
+    @Override
+    public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) {
+
+        return new IJobletEventListener() {
+            @Override
+            public void jobletFinish(JobStatus jobStatus) {
+                try {
+                    ITransactionManager txnManager =
+                            ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
+                                    .getTransactionSubsystem().getTransactionManager();
+                    for (JobId jobId : jobIds) {
+                        ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false);
+                        txnContext.setWriteTxn(transactionalWrite);
+                        txnManager.completedTransaction(txnContext, new DatasetId(-1), -1,
+                                !(jobStatus == JobStatus.FAILURE));
+                    }
+                } catch (ACIDException e) {
+                    throw new Error(e);
+                }
+            }
+
+            @Override
+            public void jobletStart() {
+                try {
+                    for (JobId jobId : jobIds) {
+                        ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject())
+                                .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true);
+                    }
+                } catch (ACIDException e) {
+                    throw new Error(e);
+                }
+            }
+
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
index 696726f..003137d 100644
--- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
+++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java
@@ -18,6 +18,11 @@
  */
 package org.apache.asterix.server.test;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Logger;
+
 import org.apache.asterix.test.aql.TestExecutor;
 import org.apache.asterix.test.runtime.HDFSCluster;
 import org.apache.asterix.testframework.context.TestCaseContext;
@@ -34,11 +39,6 @@ import org.junit.runners.MethodSorters;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.logging.Logger;
-
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
 @RunWith(Parameterized.class)
 public class NCServiceExecutionIT {
@@ -47,7 +47,7 @@ public class NCServiceExecutionIT {
 
     // The "target" subdirectory of asterix-server. All outputs go here.
     private static final String TARGET_DIR = StringUtils
-            .join(new String[] { System.getProperty("basedir"), "target" }, File.separator);
+            .join(new String[] { "target" }, File.separator);
 
     // Directory where the NCs create and store all data, as configured by
     // src/test/resources/NCServiceExecutionIT/cc.conf.
@@ -75,7 +75,7 @@ public class NCServiceExecutionIT {
     // paths in "load" statements in test queries to find the right data. It is
     // also used for HDFSCluster.
     private static final String ASTERIX_APP_DIR = StringUtils
-            .join(new String[] { System.getProperty("basedir"), "..", "asterix-app" },
+            .join(new String[] { "..", "asterix-app" },
                     File.separator);
 
     // Path to the actual AQL test files, which we borrow from asterix-app. This is

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index a03bc30..7285a5a 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -356,6 +356,19 @@
                     <ignore />
                   </action>
                 </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>javacc-maven-plugin</artifactId>
+                    <versionRange>[2.6,)</versionRange>
+                    <goals>
+                      <goal>jjdoc</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
               </pluginExecutions>
             </lifecycleMappingMetadata>
           </configuration>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index 2f2b7e3..afcb86e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -20,7 +20,7 @@ package org.apache.hyracks.algebricks.core.algebra.functions;
 
 import java.io.Serializable;
 
-public final class FunctionIdentifier implements Serializable {
+public class FunctionIdentifier implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String namespace;
@@ -60,6 +60,7 @@ public final class FunctionIdentifier implements Serializable {
         return name.hashCode() + namespace.hashCode();
     }
 
+    @Override
     public String toString() {
         return getNamespace() + ":" + name;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
index bd40813..278a9d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 
 /**
@@ -25,11 +27,22 @@ import java.util.Set;
  */
 public interface IApplicationConfig {
     String getString(String section, String key);
+
     String getString(String section, String key, String defaultValue);
+
     int getInt(String section, String key);
+
     int getInt(String section, String key, int defaultValue);
+
     long getLong(String section, String key);
+
     long getLong(String section, String key, long defaultValue);
+
     Set<String> getSections();
+
     Set<String> getKeys(String section);
+
+    String[] getStringArray(String section, String key);
+
+    List<Set<Entry<String, String>>> getMultiSections(String section);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java
index acac345..214627e 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java
@@ -49,13 +49,17 @@ public final class ActivityId implements IWritable, Serializable {
         return odId;
     }
 
+    public void setOperatorDescriptorId(OperatorDescriptorId odId) {
+        this.odId = odId;
+    }
+
     public int getLocalId() {
         return id;
     }
 
     @Override
     public int hashCode() {
-        return (int) (odId.hashCode() + id);
+        return odId.hashCode() + id;
     }
 
     @Override
@@ -70,6 +74,7 @@ public final class ActivityId implements IWritable, Serializable {
         return other.odId.equals(odId) && other.id == id;
     }
 
+    @Override
     public String toString() {
         return "ANID:" + odId + ":" + id;
     }
@@ -78,8 +83,8 @@ public final class ActivityId implements IWritable, Serializable {
         if (str.startsWith("ANID:")) {
             str = str.substring(5);
             int idIdx = str.lastIndexOf(':');
-            return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str
-                    .substring(idIdx + 1)));
+            return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)),
+                    Integer.parseInt(str.substring(idIdx + 1)));
         }
         throw new IllegalArgumentException("Unable to parse: " + str);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
index 339eb9d..7219040 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java
@@ -68,7 +68,7 @@ public interface IConnectorDescriptor extends Serializable {
      */
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException;
+            throws HyracksDataException;
 
     /**
      * Factory metod to create the receive side reader that reads data from this
@@ -136,4 +136,9 @@ public interface IConnectorDescriptor extends Serializable {
      * @throws JSONException
      */
     public JSONObject toJSON() throws JSONException;
+
+    /**
+     * Sets the connector Id
+     */
+    public void setConnectorId(ConnectorDescriptorId cdId);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index 1b85fbd..26561e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -20,12 +20,11 @@ package org.apache.hyracks.api.dataflow;
 
 import java.io.Serializable;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.constraints.IConstraintAcceptor;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 /**
  * Descriptor for operators in Hyracks.
@@ -41,6 +40,13 @@ public interface IOperatorDescriptor extends Serializable {
     public OperatorDescriptorId getOperatorId();
 
     /**
+     * Sets the id of the operator.
+     *
+     * @param id
+     */
+    void setOperatorId(OperatorDescriptorId id);
+
+    /**
      * Returns the number of inputs into this operator.
      *
      * @return Number of inputs.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e4385bf..c95c31c 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -26,6 +26,7 @@ public class ErrorCode {
     public static final int ERROR_PROCESSING_TUPLE = 0;
     public static final int INVALID_OPERATOR_OPERATION = 1;
     public static final int FAILURE_ON_NODE = 2;
+    public static final int ILLEGAL_ARGUMENT = 3;
 
     private ErrorCode() {
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 5165a62..7b44ff5 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -28,10 +28,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.constraints.Constraint;
 import org.apache.hyracks.api.constraints.expressions.ConstantExpression;
 import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression;
@@ -43,6 +39,9 @@ import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry {
     private static final long serialVersionUID = 1L;
@@ -98,7 +97,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
         connMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
         opInputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
         opOutputMap = new HashMap<OperatorDescriptorId, List<IConnectorDescriptor>>();
-        connectorOpMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>>();
+        connectorOpMap = new HashMap<>();
         properties = new HashMap<String, Serializable>();
         userConstraints = new HashSet<Constraint>();
         operatorIdCounter = 0;
@@ -112,6 +111,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
     @Override
     public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) {
         OperatorDescriptorId odId = new OperatorDescriptorId(operatorIdCounter++);
+        op.setOperatorId(odId);
         opMap.put(odId, op);
         return odId;
     }
@@ -119,6 +119,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
     @Override
     public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn) {
         ConnectorDescriptorId cdId = new ConnectorDescriptorId(connectorIdCounter++);
+        conn.setConnectorId(cdId);
         connMap.put(cdId, conn);
         return cdId;
     }
@@ -135,8 +136,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
             IOperatorDescriptor consumerOp, int consumerPort) {
         insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn);
         insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn);
-        connectorOpMap.put(
-                conn.getConnectorId(),
+        connectorOpMap.put(conn.getConnectorId(),
                 Pair.<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> of(
                         Pair.<IOperatorDescriptor, Integer> of(producerOp, producerPort),
                         Pair.<IOperatorDescriptor, Integer> of(consumerOp, consumerPort)));
@@ -166,20 +166,20 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
     }
 
     public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
-                .getConnectorId());
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
+                .get(conn.getConnectorId());
         return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()];
     }
 
     public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
-                .getConnectorId());
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
+                .get(conn.getConnectorId());
         return connInfo.getRight().getLeft();
     }
 
     public int getConsumerInputIndex(IConnectorDescriptor conn) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
-                .getConnectorId());
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
+                .get(conn.getConnectorId());
         return connInfo.getRight().getRight();
     }
 
@@ -220,14 +220,14 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
     }
 
     public IOperatorDescriptor getProducer(IConnectorDescriptor conn) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
-                .getConnectorId());
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
+                .get(conn.getConnectorId());
         return connInfo.getLeft().getLeft();
     }
 
     public int getProducerOutputIndex(IConnectorDescriptor conn) {
-        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap.get(conn
-                .getConnectorId());
+        Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connInfo = connectorOpMap
+                .get(conn.getConnectorId());
         return connInfo.getLeft().getRight();
     }
 
@@ -313,6 +313,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist
         vList.set(index, value);
     }
 
+    @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
index 22fe318..53db11c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java
@@ -18,11 +18,15 @@
  */
 package org.apache.hyracks.control.common.application;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.control.common.controllers.IniUtils;
 import org.ini4j.Ini;
-
-import java.util.Set;
+import org.ini4j.Profile.Section;
 
 /**
  * An implementation of IApplicationConfig which is backed by Ini4j.
@@ -49,6 +53,11 @@ public class IniApplicationConfig implements IApplicationConfig {
     }
 
     @Override
+    public String[] getStringArray(String section, String key) {
+        return IniUtils.getStringArray(ini, section, key);
+    }
+
+    @Override
     public int getInt(String section, String key) {
         return IniUtils.getInt(ini, section, key, 0);
     }
@@ -60,7 +69,7 @@ public class IniApplicationConfig implements IApplicationConfig {
 
     @Override
     public long getLong(String section, String key) {
-        return IniUtils.getLong(ini, section, key, (long) 0);
+        return IniUtils.getLong(ini, section, key, 0);
     }
 
     @Override
@@ -77,4 +86,20 @@ public class IniApplicationConfig implements IApplicationConfig {
     public Set<String> getKeys(String section) {
         return ini.get(section).keySet();
     }
+
+    @Override
+    public List<Set<Map.Entry<String, String>>> getMultiSections(String section) {
+        List<Set<Map.Entry<String, String>>> list = new ArrayList<>();
+        List<Section> secs = getMulti(section);
+        if (secs != null) {
+            for (Section sec : secs) {
+                list.add(sec.entrySet());
+            }
+        }
+        return list;
+    }
+
+    private List<Section> getMulti(String section) {
+        return ini.getAll(section);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
index 538bb0b..c6c3e73 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java
@@ -18,11 +18,13 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
-import org.ini4j.Ini;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.lang.reflect.Array;
+
+import org.ini4j.Ini;
+import org.ini4j.Profile.Section;
 
 /**
  * Some utility functions for reading Ini4j objects with default values.
@@ -49,10 +51,27 @@ public class IniUtils {
         return (value != null) ? value : default_value;
     }
 
+    @SuppressWarnings("unchecked")
+    private static <T> T getIniArray(Ini ini, String section, String key, Class<T> clazz) {
+        Section sec = ini.get(section);
+        if (clazz.getComponentType() == null) {
+            return null;
+        }
+        if (sec == null) {
+            return (T) Array.newInstance(clazz.getComponentType(), 0);
+        } else {
+            return sec.getAll(key, clazz);
+        }
+    }
+
     public static String getString(Ini ini, String section, String key, String defaultValue) {
         return getIniValue(ini, section, key, defaultValue, String.class);
     }
 
+    public static String[] getStringArray(Ini ini, String section, String key) {
+        return getIniArray(ini, section, key, String[].class);
+    }
+
     public static int getInt(Ini ini, String section, String key, int defaultValue) {
         return getIniValue(ini, section, key, defaultValue, Integer.class);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
index 5caf477..e780ea0 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java
@@ -18,19 +18,18 @@
  */
 package org.apache.hyracks.dataflow.std.base;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.constraints.IConstraintAcceptor;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.job.ActivityCluster;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public abstract class AbstractConnectorDescriptor implements IConnectorDescriptor {
     private static final long serialVersionUID = 1L;
-    protected final ConnectorDescriptorId id;
+    protected ConnectorDescriptorId id;
 
     protected String displayName;
 
@@ -39,14 +38,17 @@ public abstract class AbstractConnectorDescriptor implements IConnectorDescripto
         displayName = getClass().getName() + "[" + id + "]";
     }
 
+    @Override
     public ConnectorDescriptorId getConnectorId() {
         return id;
     }
 
+    @Override
     public String getDisplayName() {
         return displayName;
     }
 
+    @Override
     public void setDisplayName(String displayName) {
         this.displayName = displayName;
     }
@@ -67,4 +69,9 @@ public abstract class AbstractConnectorDescriptor implements IConnectorDescripto
             ICCApplicationContext appCtx) {
         // do nothing
     }
+
+    @Override
+    public void setConnectorId(ConnectorDescriptorId id) {
+        this.id = id;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
index e930a2b..4f22a17 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java
@@ -18,20 +18,19 @@
  */
 package org.apache.hyracks.dataflow.std.base;
 
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.api.constraints.IConstraintAcceptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor {
     private static final long serialVersionUID = 1L;
 
-    protected final OperatorDescriptorId odId;
+    protected OperatorDescriptorId odId;
 
     protected String[] partitions;
 
@@ -57,6 +56,11 @@ public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor
     }
 
     @Override
+    public void setOperatorId(OperatorDescriptorId id) {
+        this.odId = id;
+    }
+
+    @Override
     public int getInputArity() {
         return inputArity;
     }
@@ -71,10 +75,12 @@ public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor
         return recordDescriptors;
     }
 
+    @Override
     public String getDisplayName() {
         return displayName;
     }
 
+    @Override
     public void setDisplayName(String displayName) {
         this.displayName = displayName;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
index 775c367..9437b00 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.base;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 
 public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements IActivity {
@@ -39,6 +40,14 @@ public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractO
     }
 
     @Override
+    public final void setOperatorId(OperatorDescriptorId id) {
+        super.setOperatorId(id);
+        if (activityNodeId != null && !activityNodeId.getOperatorDescriptorId().equals(odId)) {
+            activityNodeId.setOperatorDescriptorId(odId);
+        }
+    }
+
+    @Override
     public final void contributeActivities(IActivityGraphBuilder builder) {
         builder.addActivity(this, this);
         for (int i = 0; i < getInputArity(); ++i) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
index 44d77ac..b1cd83e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java
@@ -60,7 +60,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(),
                 nConsumerPartitions, localityMap, index);
     }
@@ -78,8 +78,9 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
             int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         BitSet expectedPartitions = new BitSet(nProducerPartitions);
         for (int i = 0; i < nProducerPartitions; i++) {
-            if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions))
+            if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions)) {
                 expectedPartitions.set(i);
+            }
         }
         NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
                 expectedPartitions);
@@ -87,5 +88,4 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT
         return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader,
                 channelReader);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
index 2ca70da..d748c8e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java
@@ -34,16 +34,17 @@ import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
 import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 
 public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor {
+
+    private static final long serialVersionUID = 1L;
+
     public MToNBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec) {
         super(spec);
     }
 
-    private static final long serialVersionUID = 1L;
-
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
         final boolean[] isOpen = new boolean[nConsumerPartitions];
         for (int i = 0; i < nConsumerPartitions; ++i) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
index 4872b95..d26b9ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java
@@ -45,7 +45,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
index 04de894..edcad42 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java
@@ -69,14 +69,14 @@ public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConn
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
-        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tpcf.createPartitioner());
+        final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc,
+                tpcf.createPartitioner());
         return hashWriter;
     }
 
     @Override
-    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
-            int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+    public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+            int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
         IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index e0c886f..f773918 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -48,7 +48,7 @@ public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor {
     @Override
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
-                    throws HyracksDataException {
+            throws HyracksDataException {
         return edwFactory.createFrameWriter(index);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 56af78e..67af861 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -80,16 +80,16 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
 
     @Override
     public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
-                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        SplitterMaterializerActivityNode sma =
+                new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
         builder.addActivity(this, sma);
         builder.addSourceEdge(0, sma, 0);
         int pipelineOutputIndex = 0;
         int activityId = MATERIALIZE_READER_ACTIVITY_ID;
         for (int i = 0; i < outputArity; i++) {
             if (outputMaterializationFlags[i]) {
-                MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
-                        new ActivityId(odId, activityId++));
+                MaterializeReaderActivityNode mra =
+                        new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
                 builder.addActivity(this, mra);
                 builder.addBlockingEdge(sma, mra);
                 builder.addTargetEdge(i, mra, 0);
@@ -139,6 +139,15 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
                 }
 
                 @Override
+                public void flush() throws HyracksDataException {
+                    if (!requiresMaterialization) {
+                        for (IFrameWriter writer : writers) {
+                            writer.flush();
+                        }
+                    }
+                }
+
+                @Override
                 public void close() throws HyracksDataException {
                     HyracksDataException hde = null;
                     try {
@@ -218,4 +227,5 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
             };
         }
     }
+
 }


Mime
View raw message