Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6F770200BE3 for ; Wed, 7 Dec 2016 21:59:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6E36F160B0C; Wed, 7 Dec 2016 20:59:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2694C160B2B for ; Wed, 7 Dec 2016 21:59:40 +0100 (CET) Received: (qmail 2141 invoked by uid 500); 7 Dec 2016 20:59:40 -0000 Mailing-List: contact commits-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list commits@asterixdb.apache.org Received: (qmail 2015 invoked by uid 99); 7 Dec 2016 20:59:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 20:59:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 00E47F214E; Wed, 7 Dec 2016 20:59:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sjaco002@apache.org To: commits@asterixdb.apache.org Date: Wed, 07 Dec 2016 20:59:43 -0000 Message-Id: <35aecda3e1d5492f83feeaa8532b02bf@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/7] asterixdb-bad git commit: Updated to match code changes to asterix archived-at: Wed, 07 Dec 2016 20:59:43 -0000 http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java new file mode 100644 index 0000000..527d65b --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseBrokersSearchKey.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import org.apache.asterix.metadata.MetadataNode; +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class DataverseBrokersSearchKey implements IExtensionMetadataSearchKey { + private static final long serialVersionUID = 1L; + private final String dataverse; + + public DataverseBrokersSearchKey(String dataverse) { + this.dataverse = dataverse; + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_BROKER_INDEX_ID; + } + + @Override + public ITupleReference getSearchKey() { + return MetadataNode.createTuple(dataverse); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java new file mode 100644 index 0000000..ffb3ab6 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseChannelsSearchKey.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import org.apache.asterix.metadata.MetadataNode; +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class DataverseChannelsSearchKey implements IExtensionMetadataSearchKey { + private static final long serialVersionUID = 1L; + private final String dataverse; + + public DataverseChannelsSearchKey(String dataverse) { + this.dataverse = dataverse; + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_CHANNEL_INDEX_ID; + } + + @Override + public ITupleReference getSearchKey() { + return MetadataNode.createTuple(dataverse); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java new file mode 100644 index 0000000..b64bf1b --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import java.util.List; + +import org.apache.asterix.active.EntityId; +import org.apache.asterix.bad.BADConstants; +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataEntity; + +public class Procedure implements IExtensionMetadataEntity { + private static final long serialVersionUID = 1L; + public static final String LANGUAGE_AQL = "AQL"; + public static final String LANGUAGE_JAVA = "JAVA"; + + public static final String RETURNTYPE_VOID = "VOID"; + public static final String NOT_APPLICABLE = "N/A"; + + private final EntityId procedureId; + private final int arity; + private final List params; + private final String body; + private final String returnType; + private final String language; + + public Procedure(String dataverseName, String functionName, int arity, List params, String returnType, + String functionBody, String language) { + this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName); + this.params = params; + this.body = functionBody; + this.returnType = returnType == null ? RETURNTYPE_VOID : returnType; + this.language = language; + this.arity = arity; + } + + public EntityId getEntityId() { + return procedureId; + } + + public List getParams() { + return params; + } + + public String getBody() { + return body; + } + + public String getReturnType() { + return returnType; + } + + public String getLanguage() { + return language; + } + + public int getArity() { + return arity; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof Procedure)) { + return false; + } + Procedure otherDataset = (Procedure) other; + if (!otherDataset.procedureId.equals(procedureId)) { + return false; + } + return true; + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java new file mode 100644 index 0000000..6456170 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureSearchKey.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.metadata; + +import org.apache.asterix.metadata.MetadataNode; +import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId; +import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +public class ProcedureSearchKey implements IExtensionMetadataSearchKey { + private static final long serialVersionUID = 1L; + private final String dataverse; + private final String channel; + private final String arity; + + public ProcedureSearchKey(String dataverse, String channel, String arity) { + this.dataverse = dataverse; + this.channel = channel; + this.arity = arity; + } + + @Override + public ExtensionMetadataDatasetId getDatasetId() { + return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID; + } + + @Override + public ITupleReference getSearchKey() { + return MetadataNode.createTuple(dataverse, channel, arity); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java new file mode 100644 index 0000000..f2eab9b --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.bad.metadata; + +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; +import org.apache.asterix.metadata.MetadataException; +import org.apache.asterix.metadata.entitytupletranslators.AbstractTupleTranslator; +import org.apache.asterix.om.base.AOrderedList; +import org.apache.asterix.om.base.ARecord; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.base.IACursor; +import org.apache.asterix.om.types.AOrderedListType; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; + +/** + * Translates a Procedure metadata entity to an ITupleReference and vice versa. + */ +public class ProcedureTupleTranslator extends AbstractTupleTranslator { + // Field indexes of serialized Procedure in a tuple. + // First key field. + public static final int PROCEDURE_DATAVERSENAME_TUPLE_FIELD_INDEX = 0; + // Second key field. + public static final int PROCEDURE_PROCEDURE_NAME_TUPLE_FIELD_INDEX = 1; + // Third key field. + public static final int PROCEDURE_ARITY_TUPLE_FIELD_INDEX = 2; + + // Payload field containing serialized Procedure. + public static final int PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX = 3; + + @SuppressWarnings("unchecked") + private ISerializerDeserializer recordSerDes = SerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE); + + protected ProcedureTupleTranslator(boolean getTuple) { + super(getTuple, BADMetadataIndexes.NUM_FIELDS_PROCEDURE_IDX); + } + + @Override + public Procedure getMetadataEntityFromTuple(ITupleReference frameTuple) throws IOException { + byte[] serRecord = frameTuple.getFieldData(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX); + int recordStartOffset = frameTuple.getFieldStart(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX); + int recordLength = frameTuple.getFieldLength(PROCEDURE_PAYLOAD_TUPLE_FIELD_INDEX); + ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength); + DataInput in = new DataInputStream(stream); + ARecord procedureRecord = recordSerDes.deserialize(in); + return createProcedureFromARecord(procedureRecord); + } + + private Procedure createProcedureFromARecord(ARecord procedureRecord) { + String dataverseName = + ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX)) + .getStringValue(); + String procedureName = + ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX)) + .getStringValue(); + String arity = ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX)).getStringValue(); + + IACursor cursor = ((AOrderedList) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX)).getCursor(); + List params = new ArrayList(); + while (cursor.next()) { + params.add(((AString) cursor.get()).getStringValue()); + } + + String returnType = ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX)) + .getStringValue(); + + String definition = ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX)) + .getStringValue(); + + String language = ((AString) procedureRecord + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX)) + .getStringValue(); + + return new Procedure(dataverseName, procedureName, Integer.parseInt(arity), params, returnType, definition, + language); + + } + + @Override + public ITupleReference getTupleFromMetadataEntity(Procedure procedure) throws IOException, MetadataException { + // write the key in the first 2 fields of the tuple + tupleBuilder.reset(); + aString.setValue(procedure.getEntityId().getDataverse()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + aString.setValue(procedure.getEntityId().getEntityName()); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + aString.setValue(procedure.getArity() + ""); + stringSerde.serialize(aString, tupleBuilder.getDataOutput()); + tupleBuilder.addFieldEndOffset(); + + // write the pay-load in the fourth field of the tuple + + recordBuilder.reset(BADMetadataRecordTypes.PROCEDURE_RECORDTYPE); + + // write field 0 + fieldValue.reset(); + aString.setValue(procedure.getEntityId().getDataverse()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_DATAVERSENAME_FIELD_INDEX, fieldValue); + + // write field 1 + fieldValue.reset(); + aString.setValue(procedure.getEntityId().getEntityName()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX, fieldValue); + + // write field 2 + fieldValue.reset(); + aString.setValue(procedure.getArity() + ""); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX, fieldValue); + + // write field 3 + OrderedListBuilder listBuilder = new OrderedListBuilder(); + ArrayBackedValueStorage itemValue = new ArrayBackedValueStorage(); + listBuilder.reset((AOrderedListType) BADMetadataRecordTypes.PROCEDURE_RECORDTYPE + .getFieldTypes()[BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX]); + for (String param : procedure.getParams()) { + itemValue.reset(); + aString.setValue(param); + stringSerde.serialize(aString, itemValue.getDataOutput()); + listBuilder.addItem(itemValue); + } + fieldValue.reset(); + listBuilder.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX, fieldValue); + + // write field 4 + fieldValue.reset(); + aString.setValue(procedure.getReturnType()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue); + + // write field 5 + fieldValue.reset(); + aString.setValue(procedure.getBody()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX, fieldValue); + + // write field 6 + fieldValue.reset(); + aString.setValue(procedure.getLanguage()); + stringSerde.serialize(aString, fieldValue.getDataOutput()); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX, fieldValue); + + // write record + recordBuilder.write(tupleBuilder.getDataOutput(), true); + tupleBuilder.addFieldEndOffset(); + + tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); + return tuple; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java new file mode 100644 index 0000000..89f0d20 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/rules/InsertBrokerNotifierForChannelRule.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.rules; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.active.EntityId; +import org.apache.asterix.algebra.operators.CommitOperator; +import org.apache.asterix.bad.BADConstants; +import org.apache.asterix.bad.runtime.NotifyBrokerOperator; +import org.apache.asterix.bad.runtime.NotifyBrokerPOperator; +import org.apache.asterix.lang.common.util.FunctionUtil; +import org.apache.asterix.metadata.declared.DatasetDataSource; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.constants.AsterixConstantValue; +import org.apache.asterix.om.functions.AsterixBuiltinFunctions; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator; +import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; + +public class InsertBrokerNotifierForChannelRule implements IAlgebraicRewriteRule { + + @Override + public boolean rewritePre(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + return false; + } + + @Override + public boolean rewritePost(Mutable opRef, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue(); + if (op1.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) { + return false; + } + AbstractLogicalOperator op = (AbstractLogicalOperator) op1.getInputs().get(0).getValue(); + if (op.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR) { + return false; + } + DelegateOperator eOp = (DelegateOperator) op; + if (!(eOp.getDelegate() instanceof CommitOperator)) { + return false; + } + AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue(); + if (descendantOp.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) { + return false; + } + InsertDeleteUpsertOperator insertOp = (InsertDeleteUpsertOperator) descendantOp; + if (insertOp.getOperation() != InsertDeleteUpsertOperator.Kind.INSERT) { + return false; + } + DatasetDataSource dds = (DatasetDataSource) insertOp.getDataSource(); + String datasetName = dds.getDataset().getDatasetName(); + if (!dds.getDataset().getItemTypeDataverseName().equals("Metadata") + || !dds.getDataset().getItemTypeName().equals("ChannelResultsType") + || !datasetName.endsWith("Results")) { + return false; + } + String channelDataverse = dds.getDataset().getDataverseName(); + //Now we know that we are inserting into results + + String channelName = datasetName.substring(0, datasetName.length() - 7); + String subscriptionsName = channelName + "Subscriptions"; + //TODO: Can we check here to see if there is a channel with such a name? + + DataSourceScanOperator subscriptionsScan = (DataSourceScanOperator) findOp(op, subscriptionsName); + if (subscriptionsScan == null) { + return false; + } + + //Now we want to make sure and set the commit to be a nonsink commit + ((CommitOperator) eOp.getDelegate()).setSink(false); + + //Now we need to get the broker EndPoint + LogicalVariable brokerEndpointVar = context.newVar(); + AbstractLogicalOperator opAboveBrokersScan = findOp(op, "brokers"); + AssignOperator assignOp = createbrokerEndPointAssignOperator(brokerEndpointVar, opAboveBrokersScan); + //now brokerNameVar holds the brokerName for use farther up in the plan + + context.computeAndSetTypeEnvironmentForOperator(assignOp); + context.computeAndSetTypeEnvironmentForOperator(opAboveBrokersScan); + context.computeAndSetTypeEnvironmentForOperator(eOp); + + //get subscriptionIdVar + LogicalVariable subscriptionIdVar = subscriptionsScan.getVariables().get(0); + + //The channelExecutionTime is created just before the scan + LogicalVariable channelExecutionVar = ((AssignOperator) subscriptionsScan.getInputs().get(0).getValue()) + .getVariables().get(0); + + ProjectOperator badProject = (ProjectOperator) findOp(op, "project"); + badProject.getVariables().add(subscriptionIdVar); + badProject.getVariables().add(brokerEndpointVar); + badProject.getVariables().add(channelExecutionVar); + context.computeAndSetTypeEnvironmentForOperator(badProject); + + //Create my brokerNotify plan above the extension Operator + DelegateOperator dOp = createNotifyBrokerPlan(brokerEndpointVar, subscriptionIdVar, channelExecutionVar, + context, eOp, (DistributeResultOperator) op1, channelDataverse, channelName); + + opRef.setValue(dOp); + + return true; + } + + private DelegateOperator createNotifyBrokerPlan(LogicalVariable brokerEndpointVar, + LogicalVariable subscriptionIdVar, LogicalVariable channelExecutionVar, IOptimizationContext context, + ILogicalOperator eOp, DistributeResultOperator distributeOp, String channelDataverse, String channelName) + throws AlgebricksException { + //create the Distinct Op + ArrayList> expressions = new ArrayList>(); + VariableReferenceExpression vExpr = new VariableReferenceExpression(subscriptionIdVar); + expressions.add(new MutableObject(vExpr)); + DistinctOperator distinctOp = new DistinctOperator(expressions); + + //create the GroupBy Op + //And set the distinct as input + List>> groupByList = new ArrayList>>(); + List>> groupByDecorList = new ArrayList>>(); + List nestedPlans = new ArrayList(); + + //create group by operator + GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans); + groupbyOp.addGbyExpression(null, new VariableReferenceExpression(brokerEndpointVar)); + groupbyOp.addGbyExpression(null, new VariableReferenceExpression(channelExecutionVar)); + groupbyOp.getInputs().add(new MutableObject(distinctOp)); + + //create nested plan for subscription ids in group by + NestedTupleSourceOperator nestedTupleSourceOp = new NestedTupleSourceOperator( + new MutableObject(groupbyOp)); + //TODO: This is from translationcontext. It might be needed to make the variable exist outside of the subplan + //LogicalVariable subscriptionListVar = context.newSubplanOutputVar(); + LogicalVariable subscriptionListVar = context.newVar(); + List aggVars = new ArrayList(); + aggVars.add(subscriptionListVar); + AggregateFunctionCallExpression funAgg = AsterixBuiltinFunctions.makeAggregateFunctionExpression( + AsterixBuiltinFunctions.LISTIFY, new ArrayList>()); + funAgg.getArguments() + .add(new MutableObject(new VariableReferenceExpression(subscriptionIdVar))); + List> aggExpressions = new ArrayList>(); + aggExpressions.add(new MutableObject(funAgg)); + AggregateOperator listifyOp = new AggregateOperator(aggVars, aggExpressions); + listifyOp.getInputs().add(new MutableObject(nestedTupleSourceOp)); + + //add nested plans + nestedPlans.add(new ALogicalPlanImpl(new MutableObject(listifyOp))); + + //Create the NotifyBrokerOperator + NotifyBrokerOperator notifyBrokerOp = new NotifyBrokerOperator(brokerEndpointVar, subscriptionListVar, + channelExecutionVar); + EntityId activeId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, channelDataverse, channelName); + NotifyBrokerPOperator notifyBrokerPOp = new NotifyBrokerPOperator(activeId); + notifyBrokerOp.setPhysicalOperator(notifyBrokerPOp); + DelegateOperator extensionOp = new DelegateOperator(notifyBrokerOp); + extensionOp.setPhysicalOperator(notifyBrokerPOp); + extensionOp.getInputs().add(new MutableObject(groupbyOp)); + + //Set the input for the brokerNotify as the replicate operator + distinctOp.getInputs().add(new MutableObject(eOp)); + + //compute environment bottom up + + context.computeAndSetTypeEnvironmentForOperator(distinctOp); + context.computeAndSetTypeEnvironmentForOperator(groupbyOp); + context.computeAndSetTypeEnvironmentForOperator(nestedTupleSourceOp); + context.computeAndSetTypeEnvironmentForOperator(listifyOp); + context.computeAndSetTypeEnvironmentForOperator(extensionOp); + + return extensionOp; + + } + + @SuppressWarnings("unchecked") + private AssignOperator createbrokerEndPointAssignOperator(LogicalVariable brokerEndpointVar, + AbstractLogicalOperator opAboveBrokersScan) { + Mutable fieldRef = new MutableObject( + new ConstantExpression(new AsterixConstantValue(new AString(BADConstants.BrokerEndPoint)))); + DataSourceScanOperator brokerScan = null; + int index = 0; + for (Mutable subOp : opAboveBrokersScan.getInputs()) { + if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) { + brokerScan = (DataSourceScanOperator) subOp.getValue(); + break; + } + index++; + } + Mutable varRef = new MutableObject( + new VariableReferenceExpression(brokerScan.getVariables().get(2))); + + ScalarFunctionCallExpression fieldAccessByName = new ScalarFunctionCallExpression( + FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), varRef, fieldRef); + ArrayList varArray = new ArrayList(1); + varArray.add(brokerEndpointVar); + ArrayList> exprArray = new ArrayList>(1); + exprArray.add(new MutableObject(fieldAccessByName)); + + AssignOperator assignOp = new AssignOperator(varArray, exprArray); + + //Place assignOp between the scan and the op above it + assignOp.getInputs().add(new MutableObject(brokerScan)); + opAboveBrokersScan.getInputs().set(index, new MutableObject(assignOp)); + + return assignOp; + } + + /*This function searches for the needed op + * If lookingForBrokers, find the op above the brokers scan + * Else find the suscbriptionsScan + */ + private AbstractLogicalOperator findOp(AbstractLogicalOperator op, String lookingForString) { + if (!op.hasInputs()) { + return null; + } + for (Mutable subOp : op.getInputs()) { + if (lookingForString.equals("brokers")) { + if (isBrokerScan((AbstractLogicalOperator) subOp.getValue())) { + return op; + } else { + AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), + lookingForString); + if (nestedOp != null) { + return nestedOp; + } + } + + } else if (lookingForString.equals("project")) { + if (subOp.getValue().getOperatorTag() == LogicalOperatorTag.PROJECT) { + return (AbstractLogicalOperator) subOp.getValue(); + } else { + AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), + lookingForString); + if (nestedOp != null) { + return nestedOp; + } + } + } + + else { + if (isSubscriptionsScan((AbstractLogicalOperator) subOp.getValue(), lookingForString)) { + return (AbstractLogicalOperator) subOp.getValue(); + } else { + AbstractLogicalOperator nestedOp = findOp((AbstractLogicalOperator) subOp.getValue(), + lookingForString); + if (nestedOp != null) { + return nestedOp; + } + } + + } + } + return null; + } + + private boolean isBrokerScan(AbstractLogicalOperator op) { + if (op instanceof DataSourceScanOperator) { + if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) { + DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource(); + if (dds.getDataset().getDataverseName().equals("Metadata") + && dds.getDataset().getDatasetName().equals("Broker")) { + return true; + } + } + } + return false; + } + + private boolean isSubscriptionsScan(AbstractLogicalOperator op, String subscriptionsName) { + if (op instanceof DataSourceScanOperator) { + if (((DataSourceScanOperator) op).getDataSource() instanceof DatasetDataSource) { + DatasetDataSource dds = (DatasetDataSource) ((DataSourceScanOperator) op).getDataSource(); + if (dds.getDataset().getItemTypeDataverseName().equals("Metadata") + && dds.getDataset().getItemTypeName().equals("ChannelSubscriptionsType")) { + if (dds.getDataset().getDatasetName().equals(subscriptionsName)) { + return true; + } + } + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java new file mode 100644 index 0000000..d281b49 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerOperator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.runtime; + +import java.util.Collection; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDelegatedLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorDelegate; +import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform; + +/** + * A repetitive channel operator, which uses a Java timer to run a given query periodically + */ +public class NotifyBrokerOperator extends AbstractDelegatedLogicalOperator { + private final LogicalVariable subscriptionIdVar; + private final LogicalVariable brokerEndpointVar; + private final LogicalVariable channelExecutionVar; + + public NotifyBrokerOperator(LogicalVariable brokerEndpointVar, LogicalVariable subscriptionIdVar, + LogicalVariable resultSetVar) { + this.brokerEndpointVar = brokerEndpointVar; + this.subscriptionIdVar = subscriptionIdVar; + this.channelExecutionVar = resultSetVar; + } + + public LogicalVariable getSubscriptionVariable() { + return subscriptionIdVar; + } + + public LogicalVariable getBrokerEndpointVariable() { + return brokerEndpointVar; + } + + public LogicalVariable getChannelExecutionVariable() { + return channelExecutionVar; + } + + @Override + public String toString() { + return "notify-brokers"; + } + + @Override + public boolean isMap() { + return false; + } + + @Override + public IOperatorDelegate newInstance() { + return new NotifyBrokerOperator(brokerEndpointVar, subscriptionIdVar, channelExecutionVar); + } + + @Override + public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) + throws AlgebricksException { + return false; + } + + @Override + public void getUsedVariables(Collection usedVars) { + usedVars.add(subscriptionIdVar); + usedVars.add(brokerEndpointVar); + usedVars.add(channelExecutionVar); + } + + @Override + public void getProducedVariables(Collection producedVars) { + // none produced + + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java new file mode 100644 index 0000000..12d5ae2 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerPOperator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.bad.runtime; + +import org.apache.asterix.active.EntityId; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; + +public class NotifyBrokerPOperator extends AbstractPhysicalOperator { + + private final EntityId entityId; + + public NotifyBrokerPOperator(EntityId entityId) { + this.entityId = entityId; + } + + @Override + public PhysicalOperatorTag getOperatorTag() { + return PhysicalOperatorTag.DELEGATE_OPERATOR; + } + + @Override + public String toString() { + return "NOTIFY_BROKERS"; + } + + @Override + public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, + IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) { + return emptyUnaryRequirements(); + } + + @Override + public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) + throws AlgebricksException { + AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue(); + deliveredProperties = op2.getDeliveredPhysicalProperties().clone(); + } + + @Override + public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, + IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) + throws AlgebricksException { + DelegateOperator notify = (DelegateOperator) op; + LogicalVariable subVar = ((NotifyBrokerOperator) notify.getDelegate()).getSubscriptionVariable(); + LogicalVariable brokerVar = ((NotifyBrokerOperator) notify.getDelegate()).getBrokerEndpointVariable(); + LogicalVariable executionVar = ((NotifyBrokerOperator) notify.getDelegate()).getChannelExecutionVariable(); + + int brokerColumn = inputSchemas[0].findVariable(brokerVar); + int subColumn = inputSchemas[0].findVariable(subVar); + int executionColumn = inputSchemas[0].findVariable(executionVar); + + IScalarEvaluatorFactory brokerEvalFactory = new ColumnAccessEvalFactory(brokerColumn); + IScalarEvaluatorFactory subEvalFactory = new ColumnAccessEvalFactory(subColumn); + IScalarEvaluatorFactory channelExecutionEvalFactory = new ColumnAccessEvalFactory(executionColumn); + + NotifyBrokerRuntimeFactory runtime = new NotifyBrokerRuntimeFactory(brokerEvalFactory, subEvalFactory, + channelExecutionEvalFactory, entityId); + + RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, + context); + + builder.contributeMicroOperator(op, runtime, recDesc); + + // and contribute one edge from its child + ILogicalOperator src = op.getInputs().get(0).getValue(); + builder.contributeGraphEdge(src, 0, notify, 0); + } + + @Override + public boolean isMicroOperator() { + return true; + } + + @Override + public boolean expensiveThanMaterialization() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java new file mode 100644 index 0000000..8634e4c --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntime.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.bad.runtime; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.asterix.active.ActiveManager; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.bad.ChannelJobService; +import org.apache.asterix.common.api.IAsterixAppRuntimeContext; +import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer; +import org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer; +import org.apache.asterix.om.base.ADateTime; +import org.apache.asterix.om.base.AOrderedList; +import org.apache.asterix.om.base.AString; +import org.apache.asterix.om.types.AOrderedListType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; + +public class NotifyBrokerRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + + private final ByteBufferInputStream bbis = new ByteBufferInputStream(); + private final DataInputStream di = new DataInputStream(bbis); + private final AOrderedListSerializerDeserializer subSerDes = new AOrderedListSerializerDeserializer( + new AOrderedListType(BuiltinType.AUUID, null)); + + private IPointable inputArg0 = new VoidPointable(); + private IPointable inputArg1 = new VoidPointable(); + private IPointable inputArg2 = new VoidPointable(); + private IScalarEvaluator eval0; + private IScalarEvaluator eval1; + private IScalarEvaluator eval2; + private final ActiveManager activeManager; + private final EntityId entityId; + + public NotifyBrokerRuntime(IHyracksTaskContext ctx, IScalarEvaluatorFactory brokerEvalFactory, + IScalarEvaluatorFactory subEvalFactory, IScalarEvaluatorFactory channelExecutionEvalFactory, + EntityId activeJobId) throws HyracksDataException { + this.tRef = new FrameTupleReference(); + eval0 = brokerEvalFactory.createScalarEvaluator(ctx); + eval1 = subEvalFactory.createScalarEvaluator(ctx); + eval2 = channelExecutionEvalFactory.createScalarEvaluator(ctx); + this.activeManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext() + .getApplicationObject()).getActiveManager(); + this.entityId = activeJobId; + } + + @Override + public void open() throws HyracksDataException { + return; + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + tRef.reset(tAccess, t); + + eval0.evaluate(tRef, inputArg0); + eval1.evaluate(tRef, inputArg1); + eval2.evaluate(tRef, inputArg2); + + int serBrokerOffset = inputArg0.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serBrokerOffset + 1); + AString endpoint = AStringSerializerDeserializer.INSTANCE.deserialize(di); + + int serSubOffset = inputArg1.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), serSubOffset + 1); + AOrderedList subs = subSerDes.deserialize(di); + + int resultSetOffset = inputArg2.getStartOffset(); + bbis.setByteBuffer(tRef.getFrameTupleAccessor().getBuffer(), resultSetOffset + 1); + ADateTime executionTime = ADateTimeSerializerDeserializer.INSTANCE.deserialize(di); + String executionTimeString; + try { + executionTimeString = executionTime.toSimpleString(); + } catch (IOException e) { + throw new HyracksDataException(e); + } + + ChannelJobService.sendBrokerNotificationsForChannel(entityId, endpoint.getStringValue(), subs, + executionTimeString); + + } + + } + + @Override + public void close() throws HyracksDataException { + return; + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + this.inputRecordDesc = recordDescriptor; + this.tAccess = new FrameTupleAccessor(inputRecordDesc); + } + + @Override + public void flush() throws HyracksDataException { + return; + } + + @Override + public void fail() throws HyracksDataException { + failed = true; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java new file mode 100644 index 0000000..d5d05cf --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/NotifyBrokerRuntimeFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.bad.runtime; + +import org.apache.asterix.active.EntityId; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class NotifyBrokerRuntimeFactory implements IPushRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final IScalarEvaluatorFactory brokerEvalFactory; + private final IScalarEvaluatorFactory subEvalFactory; + private final IScalarEvaluatorFactory channelExecutionEvalFactory; + private final EntityId entityId; + + public NotifyBrokerRuntimeFactory(IScalarEvaluatorFactory brokerEvalFactory, IScalarEvaluatorFactory subEvalFactory, + IScalarEvaluatorFactory channelExecutionEvalFactory, EntityId entityId) { + this.brokerEvalFactory = brokerEvalFactory; + this.subEvalFactory = subEvalFactory; + this.channelExecutionEvalFactory = channelExecutionEvalFactory; + this.entityId = entityId; + } + + @Override + public String toString() { + return "notify-broker"; + } + + @Override + public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + return new NotifyBrokerRuntime(ctx, brokerEvalFactory, subEvalFactory, channelExecutionEvalFactory, entityId); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java new file mode 100644 index 0000000..8093977 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java @@ -0,0 +1,78 @@ +/* + * Copyright 2009-2013 by The Regents of the University of California + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * you may obtain a copy of the License from + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.asterix.bad.runtime; + +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.EntityId; +import org.apache.asterix.bad.BADConstants; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.IOperatorNodePushable; +import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; + +/** + * A repetitive channel operator, which uses a Java timer to run a given query periodically + */ +public class RepetitiveChannelOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + + private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName()); + + /** The unique identifier of the job. **/ + protected final EntityId entityId; + + protected final JobSpecification jobSpec; + + private final String duration; + + private String strIP; + private int port; + + public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String dataverseName, String channelName, + String duration, JobSpecification channeljobSpec, String strIP, int port) { + super(spec, 0, 0); + this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverseName, channelName); + this.jobSpec = channeljobSpec; + this.duration = duration; + this.strIP = strIP; + this.port = port; + } + + @Override + public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, + IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { + ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId, + RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition); + return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId, jobSpec, duration, strIP, port); + } + + public String getDuration() { + return duration; + } + + public EntityId getEntityId() { + return entityId; + } + + public JobSpecification getJobSpec() { + return jobSpec; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java new file mode 100644 index 0000000..1bbe331 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.runtime; + +import java.util.EnumSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActiveRuntimeId; +import org.apache.asterix.active.ActiveSourceOperatorNodePushable; +import org.apache.asterix.bad.ChannelJobService; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.JobSpecification; + +public class RepetitiveChannelOperatorNodePushable extends ActiveSourceOperatorNodePushable { + + private static final Logger LOGGER = Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName()); + + private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + private final JobSpecification jobSpec; + private long duration; + private final HyracksConnection hcc; + + public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx, ActiveRuntimeId runtimeId, + JobSpecification channeljobSpec, String duration, String strIP, int port) throws HyracksDataException { + super(ctx, runtimeId); + this.jobSpec = channeljobSpec; + this.duration = ChannelJobService.findPeriod(duration); + try { + hcc = new HyracksConnection(strIP, port); + } catch (Exception e) { + throw new HyracksDataException(e); + } + } + + + @Override + protected void start() throws HyracksDataException, InterruptedException { + try { + scheduledExecutorService = + ChannelJobService.startJob(jobSpec, EnumSet.noneOf(JobFlag.class), null, hcc, duration); + } catch (Exception e) { + throw new HyracksDataException(e); + } + while (!scheduledExecutorService.isTerminated()) { + + } + + } + + @Override + protected void abort() throws HyracksDataException, InterruptedException { + scheduledExecutorService.shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/main/resources/lang-extension/lang.txt ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt b/asterix-bad/src/main/resources/lang-extension/lang.txt new file mode 100644 index 0000000..94b4c78 --- /dev/null +++ b/asterix-bad/src/main/resources/lang-extension/lang.txt @@ -0,0 +1,206 @@ +import org.apache.asterix.bad.lang.statement.BrokerDropStatement; +import org.apache.asterix.bad.lang.statement.ChannelDropStatement; +import org.apache.asterix.bad.lang.statement.ChannelSubscribeStatement; +import org.apache.asterix.bad.lang.statement.ChannelUnsubscribeStatement; +import org.apache.asterix.bad.lang.statement.CreateBrokerStatement; +import org.apache.asterix.bad.lang.statement.CreateChannelStatement; +import org.apache.asterix.bad.lang.statement.CreateProcedureStatement; + + +@merge +Statement SingleStatement() throws ParseException: +{ + // merge area 1 + before: + after: +} +{ + ( + // merge area 2 + before: + after: | stmt = ChannelSubscriptionStatement()) + { + // merge area 3 + } +} + +@merge +Statement CreateStatement() throws ParseException: +{ + // merge area 1 + before: + after: +} +{ + ( + // merge area 2 + before: + after: | stmt = ChannelSpecification() | stmt = BrokerSpecification() | stmt = ProcedureSpecification()) + { + // merge area 3 + } +} + +@merge +Statement DropStatement() throws ParseException: +{ + // merge area 1 + before: + after: +} +{ + ( + // merge area 2 + before: + after: | "channel" pairId = QualifiedName() ifExists = IfExists() + { + stmt = new ChannelDropStatement(pairId.first, pairId.second, ifExists); + } + | "broker" pairId = QualifiedName() ifExists = IfExists() + { + stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists); + } + ) + { + // merge area 3 + } +} + +@new +CreateChannelStatement ChannelSpecification() throws ParseException: +{ + Pair nameComponents = null; + FunctionSignature appliedFunction = null; + CreateChannelStatement ccs = null; + String fqFunctionName = null; + Expression period = null; + boolean distributed = false; +} +{ + ( + "repetitive" "channel" nameComponents = QualifiedName() + appliedFunction = FunctionSignature() + "period" period = FunctionCallExpr() ("distributed" { distributed = true; })? + { + ccs = new CreateChannelStatement(nameComponents.first, + nameComponents.second, appliedFunction, period, distributed); + } + ) + { + return ccs; + } +} + + +@new +CreateProcedureStatement ProcedureSpecification() throws ParseException: +{ + Pair nameComponents = null; + FunctionSignature signature; + List paramList = new ArrayList(); + String functionBody; + Token beginPos; + Token endPos; + Expression functionBodyExpr; +} +{ + "procedure" nameComponents = QualifiedName() + paramList = ParameterList() + + { + beginPos = token; + } + functionBodyExpr = Expression() + { + endPos = token; + functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn, endPos.beginLine, endPos.beginColumn); + signature = new FunctionSignature(nameComponents.first.toString(), nameComponents.second.toString(), paramList.size()); + removeCurrentScope(); + return new CreateProcedureStatement(signature, paramList, functionBody); + } +} + + + + +@new +CreateBrokerStatement BrokerSpecification() throws ParseException: +{ + CreateBrokerStatement cbs = null; + Pair name = null; + String endPoint = null; +} +{ + ( + "broker" name = QualifiedName() + endPoint = StringLiteral() + { + cbs = new CreateBrokerStatement(name.first, name.second,endPoint); + } + ) + { + return cbs; + } +} + +@new +Statement ChannelSubscriptionStatement() throws ParseException: +{ + Statement stmt = null; + Pair nameComponents = null; + List argList = new ArrayList(); + Expression tmp = null; + String id = null; + String subscriptionId = null; + Pair brokerName = null; +} +{ + ( + "subscribe" nameComponents = QualifiedName() + (tmp = Expression() + { + argList.add(tmp); + } + ( tmp = Expression() + { + argList.add(tmp); + } + )*)? brokerName = QualifiedName() + { + stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId); + } + | "unsubscribe" id = StringLiteral() nameComponents = QualifiedName() + { + setDataverses(new ArrayList()); + setDatasets(new ArrayList()); + VariableExpr varExp = new VariableExpr(); + VarIdentifier var = new VarIdentifier(); + varExp.setVar(var); + var.setValue("$subscriptionPlaceholder"); + getCurrentScope().addNewVarSymbolToScope(varExp.getVar()); + List dataverses = getDataverses(); + List datasets = getDatasets(); + // we remove the pointer to the dataverses and datasets + setDataverses(null); + setDatasets(null); + stmt = new ChannelUnsubscribeStatement(varExp, nameComponents.first, nameComponents.second, id, getVarCounter(), dataverses, datasets); + } + | "change" "subscription" subscriptionId = StringLiteral() nameComponents = QualifiedName() + (tmp = Expression() + { + argList.add(tmp); + } + ( tmp = Expression() + { + argList.add(tmp); + } + )*)? + brokerName = QualifiedName() + { + stmt = new ChannelSubscribeStatement(nameComponents.first, nameComponents.second, argList, getVarCounter(), brokerName.first, brokerName.second, subscriptionId); + } + ) + { + return stmt; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java new file mode 100644 index 0000000..77e8afe --- /dev/null +++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADExecutionTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.logging.Logger; + +import org.apache.asterix.common.config.AsterixTransactionProperties; +import org.apache.asterix.test.aql.TestExecutor; +import org.apache.asterix.test.runtime.ExecutionTestUtil; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.apache.asterix.testframework.xml.TestGroup; +import org.apache.commons.lang3.StringUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Runs the runtime test cases under 'src/test/resources/runtimets'. + */ +@RunWith(Parameterized.class) +public class BADExecutionTest { + + protected static final Logger LOGGER = Logger.getLogger(BADExecutionTest.class.getName()); + + protected static final String PATH_ACTUAL = "target/rttest" + File.separator; + protected static final String PATH_BASE = StringUtils.join(new String[] { "src", "test", "resources", "runtimets" }, + File.separator); + + protected static final String TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml"; + + protected static AsterixTransactionProperties txnProperties; + private static final TestExecutor testExecutor = new TestExecutor(); + private static final boolean cleanupOnStart = true; + private static final boolean cleanupOnStop = true; + + protected static TestGroup FailedGroup; + + @BeforeClass + public static void setUp() throws Exception { + File outdir = new File(PATH_ACTUAL); + outdir.mkdirs(); + ExecutionTestUtil.setUp(cleanupOnStart, TEST_CONFIG_FILE_NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + ExecutionTestUtil.tearDown(cleanupOnStop); + ExecutionTestUtil.integrationUtil.removeTestStorageFiles(); + } + + @Parameters(name = "BADExecutionTest {index}: {0}") + public static Collection tests() throws Exception { + return buildTestsInXml("testsuite.xml"); + } + + protected static Collection buildTestsInXml(String xmlfile) throws Exception { + Collection testArgs = new ArrayList(); + TestCaseContext.Builder b = new TestCaseContext.Builder(); + for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) { + testArgs.add(new Object[] { ctx }); + } + return testArgs; + + } + + protected TestCaseContext tcCtx; + + public BADExecutionTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + testExecutor.executeTest(PATH_ACTUAL, tcCtx, null, false, FailedGroup); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java new file mode 100644 index 0000000..ad2f1bf --- /dev/null +++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADOptimizerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.test; + +import java.io.File; +import java.util.logging.Logger; + +import org.apache.asterix.bad.lang.BADCompilationProvider; +import org.apache.asterix.bad.lang.BADQueryTranslatorFactory; +import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.IdentitiyResolverFactory; +import org.apache.asterix.test.optimizer.OptimizerTest; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class BADOptimizerTest extends OptimizerTest { + + private static final Logger LOGGER = Logger.getLogger(BADOptimizerTest.class.getName()); + + @BeforeClass + public static void setUp() throws Exception { + TEST_CONFIG_FILE_NAME = "src/test/resources/conf/asterix-build-configuration.xml"; + System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME); + final File outdir = new File(PATH_ACTUAL); + outdir.mkdirs(); + + extensionLangCompilationProvider = new BADCompilationProvider(); + statementExecutorFactory = new BADQueryTranslatorFactory(); + + integrationUtil.init(true); + // Set the node resolver to be the identity resolver that expects node names + // to be node controller ids; a valid assumption in test environment. + System.setProperty(ExternalDataConstants.NODE_RESOLVER_FACTORY_PROPERTY, + IdentitiyResolverFactory.class.getName()); + } + + public BADOptimizerTest(File queryFile, File expectedFile, File actualFile) { + super(queryFile, expectedFile, actualFile); + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml new file mode 100644 index 0000000..c2f5d41 --- /dev/null +++ b/asterix-bad/src/test/resources/conf/asterix-build-configuration.xml @@ -0,0 +1,110 @@ + + + asterix_nc1 + + asterix_nc1 + iodevice0,iodevice1 + + + asterix_nc2 + iodevice0,iodevice1 + + + asterix_nc1 + target/txnLogDir/asterix_nc1 + + + asterix_nc2 + target/txnLogDir/asterix_nc2 + + + + org.apache.asterix.bad.lang.BADQueryTranslatorExtension + + + org.apache.asterix.bad.lang.BADLangExtension + + + org.apache.asterix.bad.metadata.BADMetadataExtension + + + + max.wait.active.cluster + 60 + Maximum wait (in seconds) for a cluster to be ACTIVE (all + nodes are available) + before a submitted query/statement can be + executed. (Default = 60 seconds) + + + + log.level + WARNING + Log level for running tests/build + + + compiler.framesize + 32768 + + + compiler.sortmemory + 327680 + + + compiler.groupmemory + 163840 + + + compiler.joinmemory + 163840 + + + compiler.pregelix.home + ~/pregelix + + + storage.buffercache.pagesize + 32768 + The page size in bytes for pages in the buffer cache. + (Default = "32768" // 32KB) + + + + storage.buffercache.size + 33554432 + The size of memory allocated to the disk buffer cache. + The value should be a multiple of the buffer cache page size(Default + = "33554432" // 32MB) + + + + storage.memorycomponent.numpages + 8 + The number of pages to allocate for a memory component. + (Default = 8) + + + + plot.activate + false + Enabling plot of Algebricks plan to tmp folder. (Default = false) + + + http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/cluster.xml ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/conf/cluster.xml b/asterix-bad/src/test/resources/conf/cluster.xml new file mode 100644 index 0000000..8f0b694 --- /dev/null +++ b/asterix-bad/src/test/resources/conf/cluster.xml @@ -0,0 +1,49 @@ + + + asterix + storage + + + false + 2016 + 2 + false + 30 + + + + master + 127.0.0.1 + 127.0.0.1 + 1098 + 1099 + 8888 + + + nc1 + 127.0.0.1 + 2016 + + + nc2 + 127.0.0.1 + 2017 + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/hyracks-deployment.properties ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/conf/hyracks-deployment.properties b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties new file mode 100644 index 0000000..17a6772 --- /dev/null +++ b/asterix-bad/src/test/resources/conf/hyracks-deployment.properties @@ -0,0 +1,21 @@ +#/* +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +cc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.CCBootstrapImpl +nc.bootstrap.class=org.apache.asterix.hyracks.bootstrap.NCBootstrapImpl +cc.ip=127.0.0.1 +cc.port=1098 http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/conf/test.properties ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/conf/test.properties b/asterix-bad/src/test/resources/conf/test.properties new file mode 100644 index 0000000..86269c8 --- /dev/null +++ b/asterix-bad/src/test/resources/conf/test.properties @@ -0,0 +1,22 @@ +#/* +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +MetadataNode=nc1 +NewUniverse=true +nc1.stores=nc1data +nc2.stores=nc2data +OutputDir=/tmp/asterix_output/ http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/d0ec8377/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql ---------------------------------------------------------------------- diff --git a/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql new file mode 100644 index 0000000..4dc9291 --- /dev/null +++ b/asterix-bad/src/test/resources/optimizerts/queries/channel/channel-create.aql @@ -0,0 +1,36 @@ +/* + * Description : Check the Plan used by a channel + * Expected Res : Success + * Date : Mar 2015 + */ + +drop dataverse channels if exists; +create dataverse channels; +use dataverse channels; + + +create type TweetMessageTypeuuid as closed { + tweetid: uuid, + sender-location: point, + send-time: datetime, + referred-topics: {{ string }}, + message-text: string, + countA: int32, + countB: int32 +} + + +create dataset TweetMessageuuids(TweetMessageTypeuuid) +primary key tweetid autogenerated; + +create function NearbyTweetsContainingText($location, $text) { + for $tweet in dataset TweetMessageuuids + let $circle := create-circle($location,30.0) + where contains($tweet.message-text,$text) + and spatial-intersect($tweet.sender-location, $location) + return $tweet.message-text +}; + +write output to nc1:"rttest/channel-create.adm"; + +create repetitive channel nearbyTweetChannel using NearbyTweetsContainingText@2 period duration("PT10M"); \ No newline at end of file