Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 242FA200B77 for ; Sat, 20 Aug 2016 08:15:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 22C44160ABC; Sat, 20 Aug 2016 06:15:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 488B4160AAB for ; Sat, 20 Aug 2016 08:15:44 +0200 (CEST) Received: (qmail 62182 invoked by uid 500); 20 Aug 2016 06:15:43 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 62169 invoked by uid 99); 20 Aug 2016 06:15:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Aug 2016 06:15:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 32157E05E1; Sat, 20 Aug 2016 06:15:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: buyingyi@apache.org To: commits@asterixdb.apache.org Date: Sat, 20 Aug 2016 06:15:43 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/16] asterixdb git commit: Add Asterix Extension Manager archived-at: Sat, 20 Aug 2016 06:15:46 -0000 Repository: asterixdb Updated Branches: refs/heads/master a4815d353 -> ab81748ab http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java index 4034d77..a1a89fe 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/base/AString.java @@ -18,15 +18,16 @@ */ package org.apache.asterix.om.base; -import org.json.JSONException; -import org.json.JSONObject; +import java.io.Serializable; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.visitors.IOMVisitor; +import org.json.JSONException; +import org.json.JSONObject; -public class AString implements IAObject { +public class AString implements IAObject, Serializable { protected String value; @@ -50,8 +51,9 @@ public class AString implements IAObject { @Override public boolean equals(Object obj) { - if (!(obj instanceof AString)) + if (!(obj instanceof AString)) { return false; + } return value.equals(((AString) obj).getStringValue()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java index 248ec3a..fc1108f 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ARecordType.java @@ -44,8 +44,8 @@ import org.json.JSONObject; */ public class ARecordType extends AbstractComplexType { - public static final ARecordType FULLY_OPEN_RECORD_TYPE = - new ARecordType("OpenRecord", new String[0], new IAType[0], true); + public static final ARecordType FULLY_OPEN_RECORD_TYPE = new ARecordType("OpenRecord", new String[0], new IAType[0], + true); private static final long serialVersionUID = 1L; private final String[] fieldNames; @@ -369,4 +369,14 @@ public class ARecordType extends AbstractComplexType { } return false; } + + /** + * Create a fully open record type with the passed name + * + * @param name + * @return + */ + public static ARecordType createOpenRecordType(String name) { + return new ARecordType(name, new String[0], new IAType[0], true); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java index bb1e554..bf103fc 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java @@ -18,11 +18,13 @@ */ package org.apache.asterix.om.util; +import java.io.IOException; import java.util.logging.Logger; import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger; import org.apache.asterix.common.config.AsterixBuildProperties; import org.apache.asterix.common.config.AsterixCompilerProperties; +import org.apache.asterix.common.config.AsterixExtensionProperties; import org.apache.asterix.common.config.AsterixExternalProperties; import org.apache.asterix.common.config.AsterixFeedProperties; import org.apache.asterix.common.config.AsterixMetadataProperties; @@ -59,12 +61,15 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA private AsterixFeedProperties feedProperties; private AsterixBuildProperties buildProperties; private AsterixReplicationProperties replicationProperties; + private AsterixExtensionProperties extensionProperties; private final IGlobalRecoveryMaanger globalRecoveryMaanger; private IHyracksClientConnection hcc; private final ILibraryManager libraryManager; + private Object extensionManager; public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc, - IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) throws AsterixException { + IGlobalRecoveryMaanger globalRecoveryMaanger, ILibraryManager libraryManager) + throws AsterixException, IOException { if (INSTANCE != null) { return; } @@ -86,8 +91,9 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA INSTANCE.storageProperties = new AsterixStorageProperties(propertiesAccessor); INSTANCE.txnProperties = new AsterixTransactionProperties(propertiesAccessor); INSTANCE.feedProperties = new AsterixFeedProperties(propertiesAccessor); - INSTANCE.replicationProperties = new AsterixReplicationProperties(propertiesAccessor, - AsterixClusterProperties.INSTANCE.getCluster()); + INSTANCE.extensionProperties = new AsterixExtensionProperties(propertiesAccessor); + INSTANCE.replicationProperties = + new AsterixReplicationProperties(propertiesAccessor, AsterixClusterProperties.INSTANCE.getCluster()); INSTANCE.hcc = hcc; INSTANCE.buildProperties = new AsterixBuildProperties(propertiesAccessor); Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel()); @@ -173,4 +179,16 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA public ILibraryManager getLibraryManager() { return libraryManager; } + + public Object getExtensionManager() { + return extensionManager; + } + + public void setExtensionManager(Object extensionManager) { + this.extensionManager = extensionManager; + } + + public AsterixExtensionProperties getExtensionProperties() { + return extensionProperties; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java new file mode 100644 index 0000000..2d23058 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.job.listener; + +import java.util.List; + +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.DatasetId; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.JobId; +import org.apache.hyracks.api.context.IHyracksJobletContext; +import org.apache.hyracks.api.job.IJobletEventListener; +import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobStatus; + +/** + * This Joblet enable transactions on multiple datasets to take place in the same Hyracks Job + * It takes a list of Transaction job ids instead of a single job Id + */ +public class MultiTransactionJobletEventListenerFactory implements IJobletEventListenerFactory { + + private static final long serialVersionUID = 1L; + private final List jobIds; + private final boolean transactionalWrite; + + public MultiTransactionJobletEventListenerFactory(List jobIds, boolean transactionalWrite) { + this.jobIds = jobIds; + this.transactionalWrite = transactionalWrite; + } + + @Override + public IJobletEventListener createListener(final IHyracksJobletContext jobletContext) { + + return new IJobletEventListener() { + @Override + public void jobletFinish(JobStatus jobStatus) { + try { + ITransactionManager txnManager = + ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject()) + .getTransactionSubsystem().getTransactionManager(); + for (JobId jobId : jobIds) { + ITransactionContext txnContext = txnManager.getTransactionContext(jobId, false); + txnContext.setWriteTxn(transactionalWrite); + txnManager.completedTransaction(txnContext, new DatasetId(-1), -1, + !(jobStatus == JobStatus.FAILURE)); + } + } catch (ACIDException e) { + throw new Error(e); + } + } + + @Override + public void jobletStart() { + try { + for (JobId jobId : jobIds) { + ((IAsterixAppRuntimeContext) jobletContext.getApplicationContext().getApplicationObject()) + .getTransactionSubsystem().getTransactionManager().getTransactionContext(jobId, true); + } + } catch (ACIDException e) { + throw new Error(e); + } + } + + }; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java index 696726f..003137d 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/NCServiceExecutionIT.java @@ -18,6 +18,11 @@ */ package org.apache.asterix.server.test; +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.logging.Logger; + import org.apache.asterix.test.aql.TestExecutor; import org.apache.asterix.test.runtime.HDFSCluster; import org.apache.asterix.testframework.context.TestCaseContext; @@ -34,11 +39,6 @@ import org.junit.runners.MethodSorters; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import java.io.File; -import java.util.ArrayList; -import java.util.Collection; -import java.util.logging.Logger; - @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) public class NCServiceExecutionIT { @@ -47,7 +47,7 @@ public class NCServiceExecutionIT { // The "target" subdirectory of asterix-server. All outputs go here. private static final String TARGET_DIR = StringUtils - .join(new String[] { System.getProperty("basedir"), "target" }, File.separator); + .join(new String[] { "target" }, File.separator); // Directory where the NCs create and store all data, as configured by // src/test/resources/NCServiceExecutionIT/cc.conf. @@ -75,7 +75,7 @@ public class NCServiceExecutionIT { // paths in "load" statements in test queries to find the right data. It is // also used for HDFSCluster. private static final String ASTERIX_APP_DIR = StringUtils - .join(new String[] { System.getProperty("basedir"), "..", "asterix-app" }, + .join(new String[] { "..", "asterix-app" }, File.separator); // Path to the actual AQL test files, which we borrow from asterix-app. This is http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/asterixdb/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index a03bc30..7285a5a 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -356,6 +356,19 @@ + + + org.codehaus.mojo + javacc-maven-plugin + [2.6,) + + jjdoc + + + + + + http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java index 2f2b7e3..afcb86e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java @@ -20,7 +20,7 @@ package org.apache.hyracks.algebricks.core.algebra.functions; import java.io.Serializable; -public final class FunctionIdentifier implements Serializable { +public class FunctionIdentifier implements Serializable { private static final long serialVersionUID = 1L; private final String namespace; @@ -60,6 +60,7 @@ public final class FunctionIdentifier implements Serializable { return name.hashCode() + namespace.hashCode(); } + @Override public String toString() { return getNamespace() + ":" + name; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java index bd40813..278a9d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplicationConfig.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.api.application; +import java.util.List; +import java.util.Map.Entry; import java.util.Set; /** @@ -25,11 +27,22 @@ import java.util.Set; */ public interface IApplicationConfig { String getString(String section, String key); + String getString(String section, String key, String defaultValue); + int getInt(String section, String key); + int getInt(String section, String key, int defaultValue); + long getLong(String section, String key); + long getLong(String section, String key, long defaultValue); + Set getSections(); + Set getKeys(String section); + + String[] getStringArray(String section, String key); + + List>> getMultiSections(String section); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java index acac345..214627e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ActivityId.java @@ -49,13 +49,17 @@ public final class ActivityId implements IWritable, Serializable { return odId; } + public void setOperatorDescriptorId(OperatorDescriptorId odId) { + this.odId = odId; + } + public int getLocalId() { return id; } @Override public int hashCode() { - return (int) (odId.hashCode() + id); + return odId.hashCode() + id; } @Override @@ -70,6 +74,7 @@ public final class ActivityId implements IWritable, Serializable { return other.odId.equals(odId) && other.id == id; } + @Override public String toString() { return "ANID:" + odId + ":" + id; } @@ -78,8 +83,8 @@ public final class ActivityId implements IWritable, Serializable { if (str.startsWith("ANID:")) { str = str.substring(5); int idIdx = str.lastIndexOf(':'); - return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), Integer.parseInt(str - .substring(idIdx + 1))); + return new ActivityId(OperatorDescriptorId.parse(str.substring(0, idIdx)), + Integer.parseInt(str.substring(idIdx + 1))); } throw new IllegalArgumentException("Unable to parse: " + str); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java index 339eb9d..7219040 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IConnectorDescriptor.java @@ -68,7 +68,7 @@ public interface IConnectorDescriptor extends Serializable { */ public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException; + throws HyracksDataException; /** * Factory metod to create the receive side reader that reads data from this @@ -136,4 +136,9 @@ public interface IConnectorDescriptor extends Serializable { * @throws JSONException */ public JSONObject toJSON() throws JSONException; + + /** + * Sets the connector Id + */ + public void setConnectorId(ConnectorDescriptorId cdId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java index 1b85fbd..26561e6 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java @@ -20,12 +20,11 @@ package org.apache.hyracks.api.dataflow; import java.io.Serializable; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.constraints.IConstraintAcceptor; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.json.JSONException; +import org.json.JSONObject; /** * Descriptor for operators in Hyracks. @@ -41,6 +40,13 @@ public interface IOperatorDescriptor extends Serializable { public OperatorDescriptorId getOperatorId(); /** + * Sets the id of the operator. + * + * @param id + */ + void setOperatorId(OperatorDescriptorId id); + + /** * Returns the number of inputs into this operator. * * @return Number of inputs. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index e4385bf..c95c31c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -26,6 +26,7 @@ public class ErrorCode { public static final int ERROR_PROCESSING_TUPLE = 0; public static final int INVALID_OPERATOR_OPERATION = 1; public static final int FAILURE_ON_NODE = 2; + public static final int ILLEGAL_ARGUMENT = 3; private ErrorCode() { } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index 5165a62..7b44ff5 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -28,10 +28,6 @@ import java.util.Map; import java.util.Set; import org.apache.commons.lang3.tuple.Pair; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.api.constraints.Constraint; import org.apache.hyracks.api.constraints.expressions.ConstantExpression; import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; @@ -43,6 +39,9 @@ import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.dataset.ResultSetId; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; public class JobSpecification implements Serializable, IOperatorDescriptorRegistry, IConnectorDescriptorRegistry { private static final long serialVersionUID = 1L; @@ -98,7 +97,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist connMap = new HashMap(); opInputMap = new HashMap>(); opOutputMap = new HashMap>(); - connectorOpMap = new HashMap, Pair>>(); + connectorOpMap = new HashMap<>(); properties = new HashMap(); userConstraints = new HashSet(); operatorIdCounter = 0; @@ -112,6 +111,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist @Override public OperatorDescriptorId createOperatorDescriptorId(IOperatorDescriptor op) { OperatorDescriptorId odId = new OperatorDescriptorId(operatorIdCounter++); + op.setOperatorId(odId); opMap.put(odId, op); return odId; } @@ -119,6 +119,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist @Override public ConnectorDescriptorId createConnectorDescriptor(IConnectorDescriptor conn) { ConnectorDescriptorId cdId = new ConnectorDescriptorId(connectorIdCounter++); + conn.setConnectorId(cdId); connMap.put(cdId, conn); return cdId; } @@ -135,8 +136,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist IOperatorDescriptor consumerOp, int consumerPort) { insertIntoIndexedMap(opInputMap, consumerOp.getOperatorId(), consumerPort, conn); insertIntoIndexedMap(opOutputMap, producerOp.getOperatorId(), producerPort, conn); - connectorOpMap.put( - conn.getConnectorId(), + connectorOpMap.put(conn.getConnectorId(), Pair., Pair> of( Pair. of(producerOp, producerPort), Pair. of(consumerOp, consumerPort))); @@ -166,20 +166,20 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist } public RecordDescriptor getConnectorRecordDescriptor(IConnectorDescriptor conn) { - Pair, Pair> connInfo = connectorOpMap.get(conn - .getConnectorId()); + Pair, Pair> connInfo = connectorOpMap + .get(conn.getConnectorId()); return connInfo.getLeft().getLeft().getOutputRecordDescriptors()[connInfo.getLeft().getRight()]; } public IOperatorDescriptor getConsumer(IConnectorDescriptor conn) { - Pair, Pair> connInfo = connectorOpMap.get(conn - .getConnectorId()); + Pair, Pair> connInfo = connectorOpMap + .get(conn.getConnectorId()); return connInfo.getRight().getLeft(); } public int getConsumerInputIndex(IConnectorDescriptor conn) { - Pair, Pair> connInfo = connectorOpMap.get(conn - .getConnectorId()); + Pair, Pair> connInfo = connectorOpMap + .get(conn.getConnectorId()); return connInfo.getRight().getRight(); } @@ -220,14 +220,14 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist } public IOperatorDescriptor getProducer(IConnectorDescriptor conn) { - Pair, Pair> connInfo = connectorOpMap.get(conn - .getConnectorId()); + Pair, Pair> connInfo = connectorOpMap + .get(conn.getConnectorId()); return connInfo.getLeft().getLeft(); } public int getProducerOutputIndex(IConnectorDescriptor conn) { - Pair, Pair> connInfo = connectorOpMap.get(conn - .getConnectorId()); + Pair, Pair> connInfo = connectorOpMap + .get(conn.getConnectorId()); return connInfo.getLeft().getRight(); } @@ -313,6 +313,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist vList.set(index, value); } + @Override public String toString() { StringBuilder buffer = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java index 22fe318..53db11c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/IniApplicationConfig.java @@ -18,11 +18,15 @@ */ package org.apache.hyracks.control.common.application; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.hyracks.api.application.IApplicationConfig; import org.apache.hyracks.control.common.controllers.IniUtils; import org.ini4j.Ini; - -import java.util.Set; +import org.ini4j.Profile.Section; /** * An implementation of IApplicationConfig which is backed by Ini4j. @@ -49,6 +53,11 @@ public class IniApplicationConfig implements IApplicationConfig { } @Override + public String[] getStringArray(String section, String key) { + return IniUtils.getStringArray(ini, section, key); + } + + @Override public int getInt(String section, String key) { return IniUtils.getInt(ini, section, key, 0); } @@ -60,7 +69,7 @@ public class IniApplicationConfig implements IApplicationConfig { @Override public long getLong(String section, String key) { - return IniUtils.getLong(ini, section, key, (long) 0); + return IniUtils.getLong(ini, section, key, 0); } @Override @@ -77,4 +86,20 @@ public class IniApplicationConfig implements IApplicationConfig { public Set getKeys(String section) { return ini.get(section).keySet(); } + + @Override + public List>> getMultiSections(String section) { + List>> list = new ArrayList<>(); + List
secs = getMulti(section); + if (secs != null) { + for (Section sec : secs) { + list.add(sec.entrySet()); + } + } + return list; + } + + private List
getMulti(String section) { + return ini.getAll(section); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java index 538bb0b..c6c3e73 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/IniUtils.java @@ -18,11 +18,13 @@ */ package org.apache.hyracks.control.common.controllers; -import org.ini4j.Ini; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Array; + +import org.ini4j.Ini; +import org.ini4j.Profile.Section; /** * Some utility functions for reading Ini4j objects with default values. @@ -49,10 +51,27 @@ public class IniUtils { return (value != null) ? value : default_value; } + @SuppressWarnings("unchecked") + private static T getIniArray(Ini ini, String section, String key, Class clazz) { + Section sec = ini.get(section); + if (clazz.getComponentType() == null) { + return null; + } + if (sec == null) { + return (T) Array.newInstance(clazz.getComponentType(), 0); + } else { + return sec.getAll(key, clazz); + } + } + public static String getString(Ini ini, String section, String key, String defaultValue) { return getIniValue(ini, section, key, defaultValue, String.class); } + public static String[] getStringArray(Ini ini, String section, String key) { + return getIniArray(ini, section, key, String[].class); + } + public static int getInt(Ini ini, String section, String key, int defaultValue) { return getIniValue(ini, section, key, defaultValue, Integer.class); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java index 5caf477..e780ea0 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractConnectorDescriptor.java @@ -18,19 +18,18 @@ */ package org.apache.hyracks.dataflow.std.base; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.constraints.IConstraintAcceptor; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.job.ActivityCluster; import org.apache.hyracks.api.job.IConnectorDescriptorRegistry; +import org.json.JSONException; +import org.json.JSONObject; public abstract class AbstractConnectorDescriptor implements IConnectorDescriptor { private static final long serialVersionUID = 1L; - protected final ConnectorDescriptorId id; + protected ConnectorDescriptorId id; protected String displayName; @@ -39,14 +38,17 @@ public abstract class AbstractConnectorDescriptor implements IConnectorDescripto displayName = getClass().getName() + "[" + id + "]"; } + @Override public ConnectorDescriptorId getConnectorId() { return id; } + @Override public String getDisplayName() { return displayName; } + @Override public void setDisplayName(String displayName) { this.displayName = displayName; } @@ -67,4 +69,9 @@ public abstract class AbstractConnectorDescriptor implements IConnectorDescripto ICCApplicationContext appCtx) { // do nothing } + + @Override + public void setConnectorId(ConnectorDescriptorId id) { + this.id = id; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java index e930a2b..4f22a17 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractOperatorDescriptor.java @@ -18,20 +18,19 @@ */ package org.apache.hyracks.dataflow.std.base; -import org.json.JSONException; -import org.json.JSONObject; - import org.apache.hyracks.api.application.ICCApplicationContext; import org.apache.hyracks.api.constraints.IConstraintAcceptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.json.JSONException; +import org.json.JSONObject; public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor { private static final long serialVersionUID = 1L; - protected final OperatorDescriptorId odId; + protected OperatorDescriptorId odId; protected String[] partitions; @@ -57,6 +56,11 @@ public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor } @Override + public void setOperatorId(OperatorDescriptorId id) { + this.odId = id; + } + + @Override public int getInputArity() { return inputArity; } @@ -71,10 +75,12 @@ public abstract class AbstractOperatorDescriptor implements IOperatorDescriptor return recordDescriptors; } + @Override public String getDisplayName() { return displayName; } + @Override public void setDisplayName(String displayName) { this.displayName = displayName; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java index 775c367..9437b00 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java @@ -21,6 +21,7 @@ package org.apache.hyracks.dataflow.std.base; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.IActivity; import org.apache.hyracks.api.dataflow.IActivityGraphBuilder; +import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements IActivity { @@ -39,6 +40,14 @@ public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractO } @Override + public final void setOperatorId(OperatorDescriptorId id) { + super.setOperatorId(id); + if (activityNodeId != null && !activityNodeId.getOperatorDescriptorId().equals(odId)) { + activityNodeId.setOperatorDescriptorId(odId); + } + } + + @Override public final void contributeActivities(IActivityGraphBuilder builder) { builder.addActivity(this, this); for (int i = 0; i < getInputArity(); ++i) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java index 44d77ac..b1cd83e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwareMToNPartitioningConnectorDescriptor.java @@ -60,7 +60,7 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return new LocalityAwarePartitionDataWriter(ctx, edwFactory, recordDesc, tpcf.createPartitioner(), nConsumerPartitions, localityMap, index); } @@ -78,8 +78,9 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT int receiverIndex, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { BitSet expectedPartitions = new BitSet(nProducerPartitions); for (int i = 0; i < nProducerPartitions; i++) { - if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions)) + if (localityMap.isConnected(i, receiverIndex, nConsumerPartitions)) { expectedPartitions.set(i); + } } NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions, expectedPartitions); @@ -87,5 +88,4 @@ public class LocalityAwareMToNPartitioningConnectorDescriptor extends AbstractMT return new PartitionCollector(ctx, getConnectorId(), receiverIndex, expectedPartitions, frameReader, channelReader); } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java index 2ca70da..d748c8e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNBroadcastConnectorDescriptor.java @@ -34,16 +34,17 @@ import org.apache.hyracks.dataflow.std.collectors.NonDeterministicFrameReader; import org.apache.hyracks.dataflow.std.collectors.PartitionCollector; public class MToNBroadcastConnectorDescriptor extends AbstractMToNConnectorDescriptor { + + private static final long serialVersionUID = 1L; + public MToNBroadcastConnectorDescriptor(IConnectorDescriptorRegistry spec) { super(spec); } - private static final long serialVersionUID = 1L; - @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions]; final boolean[] isOpen = new boolean[nConsumerPartitions]; for (int i = 0; i < nConsumerPartitions; ++i) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java index 4872b95..d26b9ef 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningConnectorDescriptor.java @@ -45,7 +45,7 @@ public class MToNPartitioningConnectorDescriptor extends AbstractMToNConnectorDe @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tpcf.createPartitioner()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java index 04de894..edcad42 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNPartitioningMergingConnectorDescriptor.java @@ -69,14 +69,14 @@ public class MToNPartitioningMergingConnectorDescriptor extends AbstractMToNConn public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { - final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, - recordDesc, tpcf.createPartitioner()); + final PartitionDataWriter hashWriter = new PartitionDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, + tpcf.createPartitioner()); return hashWriter; } @Override - public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, - int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { + public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index, + int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException { IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length]; for (int i = 0; i < comparatorFactories.length; ++i) { comparators[i] = comparatorFactories[i].createBinaryComparator(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java index e0c886f..f773918 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java @@ -48,7 +48,7 @@ public class OneToOneConnectorDescriptor extends AbstractConnectorDescriptor { @Override public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc, IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions) - throws HyracksDataException { + throws HyracksDataException { return edwFactory.createFrameWriter(index); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ab81748a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java index 56af78e..67af861 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java @@ -80,16 +80,16 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { @Override public void contributeActivities(IActivityGraphBuilder builder) { - SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode( - new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); + SplitterMaterializerActivityNode sma = + new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID)); builder.addActivity(this, sma); builder.addSourceEdge(0, sma, 0); int pipelineOutputIndex = 0; int activityId = MATERIALIZE_READER_ACTIVITY_ID; for (int i = 0; i < outputArity; i++) { if (outputMaterializationFlags[i]) { - MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode( - new ActivityId(odId, activityId++)); + MaterializeReaderActivityNode mra = + new MaterializeReaderActivityNode(new ActivityId(odId, activityId++)); builder.addActivity(this, mra); builder.addBlockingEdge(sma, mra); builder.addTargetEdge(i, mra, 0); @@ -139,6 +139,15 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { } @Override + public void flush() throws HyracksDataException { + if (!requiresMaterialization) { + for (IFrameWriter writer : writers) { + writer.flush(); + } + } + } + + @Override public void close() throws HyracksDataException { HyracksDataException hde = null; try { @@ -218,4 +227,5 @@ public class SplitOperatorDescriptor extends AbstractOperatorDescriptor { }; } } + }