Return-Path: X-Original-To: apmail-asterixdb-commits-archive@minotaur.apache.org Delivered-To: apmail-asterixdb-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0273D18178 for ; Sun, 3 Jan 2016 17:41:22 +0000 (UTC) Received: (qmail 40930 invoked by uid 500); 3 Jan 2016 17:41:21 -0000 Delivered-To: apmail-asterixdb-commits-archive@asterixdb.apache.org Received: (qmail 40895 invoked by uid 500); 3 Jan 2016 17:41:21 -0000 Mailing-List: contact commits-help@asterixdb.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.incubator.apache.org Delivered-To: mailing list commits@asterixdb.incubator.apache.org Received: (qmail 40884 invoked by uid 99); 3 Jan 2016 17:41:21 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2016 17:41:21 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 4EDF7180047 for ; Sun, 3 Jan 2016 17:41:21 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.227 X-Spam-Level: * X-Spam-Status: No, score=1.227 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rORnuhXEJ5hh for ; Sun, 3 Jan 2016 17:41:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id B6E3C23053 for ; Sun, 3 Jan 2016 17:40:59 +0000 (UTC) Received: (qmail 39833 invoked by uid 99); 3 Jan 2016 17:40:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2016 17:40:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69B56E01F2; Sun, 3 Jan 2016 17:40:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: amoudi@apache.org To: commits@asterixdb.incubator.apache.org Date: Sun, 03 Jan 2016 17:41:14 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/21] incubator-asterixdb git commit: First stage of external data cleanup http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java new file mode 100644 index 0000000..07d09db --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RCRecordIdReader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.indexing; + +import org.apache.asterix.om.base.AInt32; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class RCRecordIdReader extends RecordIdReader { + + public RCRecordIdReader(int[] ridFields) { + super(ridFields); + } + + @Override + public RecordId read(int index) throws HyracksDataException { + if (super.read(index) == null) { + return null; + } + // Get row number + bbis.setByteBuffer(frameBuffer, tupleStartOffset + + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX])); + rid.setRow( + ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.ROW_NUMBER_FIELD_INDEX]].deserialize(dis)) + .getIntegerValue()); + return rid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java new file mode 100644 index 0000000..14235c0 --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordColumnarIndexer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.indexing; + +import java.io.IOException; + +import org.apache.asterix.external.api.IExternalIndexer; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.input.record.reader.HDFSRecordReader; +import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; +import org.apache.asterix.om.base.AMutableInt32; +import org.apache.asterix.om.base.AMutableInt64; +import org.apache.asterix.om.base.IAObject; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; + +public class RecordColumnarIndexer implements IExternalIndexer { + + private static final long serialVersionUID = 1L; + public static final int NUM_OF_FIELDS = 3; + protected AMutableInt32 fileNumber = new AMutableInt32(0); + protected AMutableInt64 offset = new AMutableInt64(0); + protected long nextOffset; + protected AMutableInt32 rowNumber = new AMutableInt32(0); + protected RecordReader recordReader; + + @SuppressWarnings("unchecked") + private ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.AINT32); + @SuppressWarnings("unchecked") + private ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE + .getSerializerDeserializer(BuiltinType.AINT64); + + @Override + public void reset(IRecordReader reader) throws IOException { + //TODO: Make this more generic. right now, it works because we only index hdfs files. + @SuppressWarnings("unchecked") + HDFSRecordReader hdfsReader = (HDFSRecordReader) reader; + fileNumber.setValue(hdfsReader.getSnapshot().get(hdfsReader.getCurrentSplitIndex()).getFileNumber()); + recordReader = hdfsReader.getReader(); + offset.setValue(recordReader.getPos()); + nextOffset = offset.getLongValue(); + rowNumber.setValue(0); + } + + @Override + public void index(ArrayTupleBuilder tb) throws IOException { + if (recordReader.getPos() != nextOffset) { + // start of a new group + offset.setValue(nextOffset); + nextOffset = recordReader.getPos(); + rowNumber.setValue(0); + } + tb.addField(intSerde, fileNumber); + tb.addField(longSerde, offset); + tb.addField(intSerde, rowNumber); + rowNumber.setValue(rowNumber.getIntegerValue() + 1); + } + + @Override + public int getNumberOfFields() { + return NUM_OF_FIELDS; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java new file mode 100644 index 0000000..9027101 --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordId.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.indexing; + +public class RecordId { + public static enum RecordIdType { + OFFSET, + RC + } + + private int fileId; + private long offset; + private int row; + + public int getFileId() { + return fileId; + } + + public void setFileId(int fileId) { + this.fileId = fileId; + } + + public long getOffset() { + return offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public int getRow() { + return row; + } + + public void setRow(int row) { + this.row = row; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java new file mode 100644 index 0000000..2b4cc9c --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.indexing; + +import java.io.DataInputStream; +import java.nio.ByteBuffer; + +import org.apache.asterix.om.base.AInt32; +import org.apache.asterix.om.base.AInt64; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; + +public class RecordIdReader { + + private final static byte nullByte = ATypeTag.NULL.serialize(); + protected FrameTupleAccessor tupleAccessor; + protected int fieldSlotsLength; + protected int[] ridFields; + protected RecordId rid; + protected RecordDescriptor inRecDesc; + protected ByteBufferInputStream bbis; + protected DataInputStream dis; + protected int tupleStartOffset; + protected ByteBuffer frameBuffer; + + public RecordIdReader(int[] ridFields) { + this.ridFields = ridFields; + this.rid = new RecordId(); + } + + public void set(FrameTupleAccessor accessor, RecordDescriptor inRecDesc) { + this.tupleAccessor = accessor; + this.fieldSlotsLength = accessor.getFieldSlotsLength(); + this.inRecDesc = inRecDesc; + this.bbis = new ByteBufferInputStream(); + this.dis = new DataInputStream(bbis); + } + + public RecordId read(int index) throws HyracksDataException { + tupleStartOffset = tupleAccessor.getTupleStartOffset(index) + fieldSlotsLength; + int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(index, + ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]); + frameBuffer = tupleAccessor.getBuffer(); + if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) { + return null; + } + // Get file number + bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset); + rid.setFileId( + ((AInt32) inRecDesc.getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis)) + .getIntegerValue()); + // Get record group offset + bbis.setByteBuffer(frameBuffer, tupleStartOffset + + tupleAccessor.getFieldStartOffset(index, ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX])); + rid.setOffset(((AInt64) inRecDesc.getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]] + .deserialize(dis)).getLongValue()); + return rid; + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java new file mode 100644 index 0000000..d0bf2ff --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/RecordIdReaderFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.indexing; + +import java.util.Map; + +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.external.util.HDFSUtils; + +public class RecordIdReaderFactory { + + public static RecordIdReader create(Map configuration, int[] ridFields) throws AsterixException { + switch (HDFSUtils.getRecordIdType(configuration)) { + case OFFSET: + return new RecordIdReader(ridFields); + case RC: + return new RCRecordIdReader(ridFields); + default: + throw new AsterixException("Unknown Record Id type: " + HDFSUtils.getRecordIdType(configuration)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java deleted file mode 100644 index 07e09bd..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AbstractIndexingTupleParser.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.DataOutput; -import java.io.InputStream; - -import org.apache.asterix.external.indexing.input.AbstractHDFSReader; -import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.std.file.ITupleParser; - -public abstract class AbstractIndexingTupleParser implements ITupleParser{ - - protected ArrayTupleBuilder tb; - protected DataOutput dos; - protected final FrameTupleAppender appender; - protected final ARecordType recType; - protected final IHyracksCommonContext ctx; - protected final IAsterixHDFSRecordParser deserializer; - protected final AMutableInt32 aMutableInt = new AMutableInt32(0); - protected final AMutableInt64 aMutableLong = new AMutableInt64(0); - - @SuppressWarnings("rawtypes") - protected final ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32); - @SuppressWarnings("rawtypes") - protected final ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64); - - public AbstractIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser - deserializer) throws HyracksDataException { - appender = new FrameTupleAppender(new VSizeFrame(ctx)); - this.recType = recType; - this.ctx = ctx; - this.deserializer = deserializer; - } - - @Override - public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { - AbstractHDFSReader inReader = (AbstractHDFSReader) in; - Object record; - try { - inReader.initialize(); - record = inReader.readNext(); - while (record != null) { - tb.reset(); - deserializer.parse(record, tb.getDataOutput()); - tb.addFieldEndOffset(); - //append indexing fields - appendIndexingData(tb, inReader); - addTupleToFrame(writer); - record = inReader.readNext(); - } - appender.flush(writer, true); - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - protected abstract void appendIndexingData(ArrayTupleBuilder tb, - AbstractHDFSReader inReader) throws Exception; - - protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException { - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - appender.flush(writer, true); - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new IllegalStateException("Record is too big to fit in a frame"); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java deleted file mode 100644 index c94be6a..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedControlledTupleParser.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.asterix.external.indexing.IndexingConstants; -import org.apache.asterix.external.indexing.input.AbstractHDFSLookupInputStream; -import org.apache.asterix.om.base.AInt32; -import org.apache.asterix.om.base.AInt64; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.runtime.operators.file.IDataParser; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.INullWriter; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; - -/** - * class implementation for IControlledTupleParser. It provides common - * functionality involved in parsing data in an external text format (adm or delimited text) in a pipelined manner and packing - * frames with formed tuples. - */ -public class AdmOrDelimitedControlledTupleParser implements IControlledTupleParser { - - private ArrayTupleBuilder tb; - private transient DataOutput dos; - private final FrameTupleAppender appender; - protected final ARecordType recType; - private IDataParser parser; - private boolean propagateInput; - private int[] propagatedFields; - private int[] ridFields; - private RecordDescriptor inRecDesc; - private FrameTupleAccessor tupleAccessor; - private FrameTupleReference frameTuple; - private ByteBufferInputStream bbis; - private DataInputStream dis; - private AbstractHDFSLookupInputStream in; - private boolean parserInitialized = false; - private boolean retainNull; - protected byte nullByte; - protected ArrayTupleBuilder nullTupleBuild; - - public AdmOrDelimitedControlledTupleParser(IHyracksTaskContext ctx, ARecordType recType, - AbstractHDFSLookupInputStream in, boolean propagateInput, RecordDescriptor inRecDesc, IDataParser parser, - int[] propagatedFields, int[] ridFields, boolean retainNull, INullWriterFactory iNullWriterFactory) - throws HyracksDataException { - this.recType = recType; - this.in = in; - this.propagateInput = propagateInput; - this.retainNull = retainNull; - this.inRecDesc = inRecDesc; - this.propagatedFields = propagatedFields; - this.ridFields = ridFields; - this.parser = parser; - this.tupleAccessor = new FrameTupleAccessor(inRecDesc); - appender = new FrameTupleAppender(new VSizeFrame(ctx)); - if (propagateInput) { - tb = new ArrayTupleBuilder(propagatedFields.length + 1); - } else { - tb = new ArrayTupleBuilder(1); - } - frameTuple = new FrameTupleReference(); - dos = tb.getDataOutput(); - bbis = new ByteBufferInputStream(); - dis = new DataInputStream(bbis); - nullByte = ATypeTag.NULL.serialize(); - if (retainNull) { - INullWriter nullWriter = iNullWriterFactory.createNullWriter(); - nullTupleBuild = new ArrayTupleBuilder(1); - DataOutput out = nullTupleBuild.getDataOutput(); - try { - nullWriter.writeNull(out); - } catch (IOException e) { - e.printStackTrace(); - } - } else { - nullTupleBuild = null; - } - } - - @Override - public void close(IFrameWriter writer) throws Exception { - try { - in.close(); - appender.flush(writer, true); - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - @Override - public void parseNext(IFrameWriter writer, ByteBuffer frameBuffer) throws HyracksDataException { - try { - int tupleCount = 0; - int tupleIndex = 0; - tupleAccessor.reset(frameBuffer); - tupleCount = tupleAccessor.getTupleCount(); - int fieldSlotsLength = tupleAccessor.getFieldSlotsLength(); - // Loop over tuples - while (tupleIndex < tupleCount) { - boolean found = false; - int tupleStartOffset = tupleAccessor.getTupleStartOffset(tupleIndex) + fieldSlotsLength; - int fileNumberStartOffset = tupleAccessor.getFieldStartOffset(tupleIndex, - ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]); - // Check if null <- for outer join -> - if (frameBuffer.get(tupleStartOffset + fileNumberStartOffset) == nullByte) { - } else { - // Get file number - bbis.setByteBuffer(frameBuffer, tupleStartOffset + fileNumberStartOffset); - int fileNumber = ((AInt32) inRecDesc - .getFields()[ridFields[IndexingConstants.FILE_NUMBER_FIELD_INDEX]].deserialize(dis)) - .getIntegerValue(); - // Get record offset - bbis.setByteBuffer(frameBuffer, tupleStartOffset + tupleAccessor.getFieldStartOffset(tupleIndex, - ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX])); - long recordOffset = ((AInt64) inRecDesc - .getFields()[ridFields[IndexingConstants.RECORD_OFFSET_FIELD_INDEX]].deserialize(dis)) - .getLongValue(); - found = in.fetchRecord(fileNumber, recordOffset); - } - if (found) { - // Since we now know the inputStream is ready, we can safely initialize the parser - // We can't do that earlier since the parser will start pulling from the stream and if it is not ready, - // The parser will automatically release its resources - if (!parserInitialized) { - parser.initialize(in, recType, true); - parserInitialized = true; - } - tb.reset(); - if (propagateInput) { - frameTuple.reset(tupleAccessor, tupleIndex); - for (int i = 0; i < propagatedFields.length; i++) { - dos.write(frameTuple.getFieldData(propagatedFields[i]), - frameTuple.getFieldStart(propagatedFields[i]), - frameTuple.getFieldLength(propagatedFields[i])); - tb.addFieldEndOffset(); - } - } - parser.parse(tb.getDataOutput()); - tb.addFieldEndOffset(); - addTupleToFrame(writer); - } else if (propagateInput && retainNull) { - tb.reset(); - frameTuple.reset(tupleAccessor, tupleIndex); - for (int i = 0; i < propagatedFields.length; i++) { - dos.write(frameTuple.getFieldData(propagatedFields[i]), - frameTuple.getFieldStart(propagatedFields[i]), - frameTuple.getFieldLength(propagatedFields[i])); - tb.addFieldEndOffset(); - } - dos.write(nullTupleBuild.getByteArray()); - tb.addFieldEndOffset(); - addTupleToFrame(writer); - } - tupleIndex++; - } - } catch (Exception e) { - // un expected error, we try to close the inputstream and throw an exception - try { - in.close(); - } catch (IOException e1) { - e1.printStackTrace(); - } - throw new HyracksDataException(e); - } - } - - // For debugging - public void prettyPrint(FrameTupleAccessor tupleAccessor, RecordDescriptor recDesc) { - ByteBufferInputStream bbis = new ByteBufferInputStream(); - DataInputStream dis = new DataInputStream(bbis); - int tc = tupleAccessor.getTupleCount(); - System.err.println("TC: " + tc); - for (int i = 0; i < tc; ++i) { - System.err.print( - i + ":(" + tupleAccessor.getTupleStartOffset(i) + ", " + tupleAccessor.getTupleEndOffset(i) + ")["); - for (int j = 0; j < tupleAccessor.getFieldCount(); ++j) { - System.err.print(j + ":(" + tupleAccessor.getFieldStartOffset(i, j) + ", " - + tupleAccessor.getFieldEndOffset(i, j) + ") "); - System.err.print("{"); - bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i) - + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j)); - try { - byte tag = dis.readByte(); - if (tag == nullByte) { - System.err.print("NULL"); - } else { - bbis.setByteBuffer(tupleAccessor.getBuffer(), tupleAccessor.getTupleStartOffset(i) - + tupleAccessor.getFieldSlotsLength() + tupleAccessor.getFieldStartOffset(i, j)); - System.err.print(recDesc.getFields()[j].deserialize(dis)); - } - } catch (IOException e) { - e.printStackTrace(); - } - System.err.print("}"); - } - System.err.println("]"); - } - } - - protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException { - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - appender.flush(writer, true); - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new IllegalStateException(); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java deleted file mode 100644 index 6abcbb8..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/AdmOrDelimitedIndexingTupleParser.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.InputStream; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.indexing.input.AbstractHDFSReader; -import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.asterix.runtime.operators.file.IDataParser; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.std.file.ITupleParser; - -public class AdmOrDelimitedIndexingTupleParser implements ITupleParser { - - private ArrayTupleBuilder tb; - private final FrameTupleAppender appender; - private final ARecordType recType; - private final IDataParser parser; - private final AMutableInt32 aMutableInt = new AMutableInt32(0); - private final AMutableInt64 aMutableLong = new AMutableInt64(0); - - @SuppressWarnings("rawtypes") - private ISerializerDeserializer intSerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT32); - @SuppressWarnings("rawtypes") - private ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT64); - - public AdmOrDelimitedIndexingTupleParser(IHyracksCommonContext ctx, ARecordType recType, IDataParser parser) - throws HyracksDataException { - this.parser = parser; - this.recType = recType; - appender = new FrameTupleAppender(new VSizeFrame(ctx)); - tb = new ArrayTupleBuilder(3); - } - - @Override - public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { - // Cast the input stream to a record reader - AbstractHDFSReader inReader = (AbstractHDFSReader) in; - try { - parser.initialize(in, recType, true); - while (true) { - tb.reset(); - if (!parser.parse(tb.getDataOutput())) { - break; - } - tb.addFieldEndOffset(); - appendIndexingData(tb, inReader); - addTupleToFrame(writer); - } - appender.flush(writer, true); - } catch (AsterixException ae) { - throw new HyracksDataException(ae); - } catch (Exception ioe) { - throw new HyracksDataException(ioe); - } - } - - // This function is used to append RID to Hyracks tuple - @SuppressWarnings("unchecked") - private void appendIndexingData(ArrayTupleBuilder tb, AbstractHDFSReader inReader) throws Exception { - aMutableInt.setValue(inReader.getFileNumber()); - aMutableLong.setValue(inReader.getReaderPosition()); - tb.addField(intSerde, aMutableInt); - tb.addField(longSerde, aMutableLong); - } - - private void addTupleToFrame(IFrameWriter writer) throws HyracksDataException { - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - appender.flush(writer, true); - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new IllegalStateException("Record is too big to fit in a frame"); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java deleted file mode 100644 index 9271ebe..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/FileIndexTupleTranslator.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.IOException; - -import org.apache.asterix.builders.RecordBuilder; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.indexing.ExternalFile; -import org.apache.asterix.external.indexing.FilesIndexDescription; -import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider; -import org.apache.asterix.om.base.ADateTime; -import org.apache.asterix.om.base.AInt64; -import org.apache.asterix.om.base.AMutableDateTime; -import org.apache.asterix.om.base.AMutableInt32; -import org.apache.asterix.om.base.AMutableInt64; -import org.apache.asterix.om.base.AMutableString; -import org.apache.asterix.om.base.AString; -import org.apache.asterix.om.types.BuiltinType; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; - -@SuppressWarnings("unchecked") -public class FileIndexTupleTranslator { - private final FilesIndexDescription filesIndexDescription = new FilesIndexDescription(); - private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder( - filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFieldCount()); - private RecordBuilder recordBuilder = new RecordBuilder(); - private ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage(); - private AMutableInt32 aInt32 = new AMutableInt32(0); - private AMutableInt64 aInt64 = new AMutableInt64(0); - private AMutableString aString = new AMutableString(null); - private AMutableDateTime aDateTime = new AMutableDateTime(0); - private ISerializerDeserializer stringSerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ASTRING); - private ISerializerDeserializer dateTimeSerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ADATETIME); - private ISerializerDeserializer longSerde = AqlSerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AINT64); - private ArrayTupleReference tuple = new ArrayTupleReference(); - - public ITupleReference getTupleFromFile(ExternalFile file) throws IOException, AsterixException { - tupleBuilder.reset(); - //File Number - aInt32.setValue(file.getFileNumber()); - filesIndexDescription.FILE_INDEX_RECORD_DESCRIPTOR.getFields()[0].serialize(aInt32, - tupleBuilder.getDataOutput()); - tupleBuilder.addFieldEndOffset(); - - //File Record - recordBuilder.reset(filesIndexDescription.EXTERNAL_FILE_RECORD_TYPE); - // write field 0 (File Name) - fieldValue.reset(); - aString.setValue(file.getFileName()); - stringSerde.serialize(aString, fieldValue.getDataOutput()); - recordBuilder.addField(0, fieldValue); - - //write field 1 (File Size) - fieldValue.reset(); - aInt64.setValue(file.getSize()); - longSerde.serialize(aInt64, fieldValue.getDataOutput()); - recordBuilder.addField(1, fieldValue); - - //write field 2 (File Mod Date) - fieldValue.reset(); - aDateTime.setValue(file.getLastModefiedTime().getTime()); - dateTimeSerde.serialize(aDateTime, fieldValue.getDataOutput()); - recordBuilder.addField(2, fieldValue); - - //write the record - recordBuilder.write(tupleBuilder.getDataOutput(), true); - tupleBuilder.addFieldEndOffset(); - tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); - return tuple; - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java deleted file mode 100644 index b38b835..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSIndexingParserFactory.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.util.Map; - -import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory; -import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.runtime.operators.file.ADMDataParser; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.DelimitedDataParser; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.file.ITupleParser; -import org.apache.hyracks.dataflow.std.file.ITupleParserFactory; - -/** - * This is the parser factory for parsers used to do indexing - */ -public class HDFSIndexingParserFactory implements ITupleParserFactory { - - private static final long serialVersionUID = 1L; - // file input-format - private final String inputFormat; - // content format - private final String format; - // delimiter in case of delimited text - private final char delimiter; - // quote in case of delimited text - private final char quote; - // parser class name in case of binary format - private final String parserClassName; - // the expected data type - private final ARecordType atype; - // the hadoop job conf - private transient JobConf jobConf; - // adapter arguments - private Map arguments; - - public HDFSIndexingParserFactory(ARecordType atype, String inputFormat, String format, char delimiter, char quote, - String parserClassName) { - this.inputFormat = inputFormat; - this.format = format; - this.parserClassName = parserClassName; - this.delimiter = delimiter; - this.quote = quote; - this.atype = atype; - } - - @Override - public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException { - if (format == null) { - throw new IllegalArgumentException("Unspecified data format"); - } - if (inputFormat == null) { - throw new IllegalArgumentException("Unspecified data format"); - } - if (!inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC) - && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_TEXT) - && !inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_SEQUENCE)) { - throw new IllegalArgumentException("External Indexing not supportd for format " + inputFormat); - } - // Do some real work here - /* - * Choices are: - * 1. TxtOrSeq (Object) indexing tuple parser - * 2. RC indexing tuple parser - * 3. textual data tuple parser - */ - if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_ADM)) { - // choice 3 with adm data parser - ADMDataParser dataParser = new ADMDataParser(); - return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser); - } else if (format.equalsIgnoreCase(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) { - // choice 3 with delimited data parser - DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser(atype, delimiter, quote); - return new AdmOrDelimitedIndexingTupleParser(ctx, atype, dataParser); - } - - // binary data with a special parser --> create the parser - IAsterixHDFSRecordParser objectParser; - if (parserClassName.equalsIgnoreCase(HDFSAdapterFactory.PARSER_HIVE)) { - objectParser = new HiveObjectParser(); - } else { - try { - objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance(); - } catch (Exception e) { - throw new HyracksDataException("Unable to create object parser", e); - } - } - try { - objectParser.initialize(atype, arguments, jobConf); - } catch (Exception e) { - throw new HyracksDataException("Unable to initialize object parser", e); - } - - if (inputFormat.equalsIgnoreCase(HDFSAdapterFactory.INPUT_FORMAT_RC)) { - // Case 2 - return new RCFileIndexingTupleParser(ctx, atype, objectParser); - } else { - // Case 1 - return new TextOrSeqIndexingTupleParser(ctx, atype, objectParser); - } - } - - public JobConf getJobConf() { - return jobConf; - } - - public void setJobConf(JobConf jobConf) { - this.jobConf = jobConf; - } - - public Map getArguments() { - return arguments; - } - - public void setArguments(Map arguments) { - this.arguments = arguments; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java deleted file mode 100644 index d9ce7aa..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapter.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.IOException; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Map; - -import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory; -import org.apache.asterix.external.adapter.factory.HDFSIndexingAdapterFactory; -import org.apache.asterix.external.dataset.adapter.IControlledAdapter; -import org.apache.asterix.external.indexing.ExternalFileIndexAccessor; -import org.apache.asterix.external.indexing.input.RCFileLookupReader; -import org.apache.asterix.external.indexing.input.SequenceFileLookupInputStream; -import org.apache.asterix.external.indexing.input.SequenceFileLookupReader; -import org.apache.asterix.external.indexing.input.TextFileLookupInputStream; -import org.apache.asterix.external.indexing.input.TextFileLookupReader; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.operators.file.ADMDataParser; -import org.apache.asterix.runtime.operators.file.AsterixTupleParserFactory; -import org.apache.asterix.runtime.operators.file.DelimitedDataParser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.INullWriterFactory; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class HDFSLookupAdapter implements IControlledAdapter, Serializable { - - private static final long serialVersionUID = 1L; - - private RecordDescriptor inRecDesc; - private boolean propagateInput; - private int[] ridFields; - private int[] propagatedFields; - private IAType atype; - private Map configuration; - private IHyracksTaskContext ctx; - private IControlledTupleParser parser; - private ExternalFileIndexAccessor fileIndexAccessor; - private boolean retainNull; - - public HDFSLookupAdapter(IAType atype, RecordDescriptor inRecDesc, Map adapterConfiguration, - boolean propagateInput, int[] ridFields, int[] propagatedFields, IHyracksTaskContext ctx, - ExternalFileIndexAccessor fileIndexAccessor, boolean retainNull) { - this.configuration = adapterConfiguration; - this.atype = atype; - this.ctx = ctx; - this.inRecDesc = inRecDesc; - this.propagatedFields = propagatedFields; - this.propagateInput = propagateInput; - this.propagatedFields = propagatedFields; - this.fileIndexAccessor = fileIndexAccessor; - this.ridFields = ridFields; - this.retainNull = retainNull; - } - - /* - * This function is not easy to read and could be refactored into a better structure but for now it works - */ - @Override - public void initialize(IHyracksTaskContext ctx, INullWriterFactory iNullWriterFactory) throws Exception { - JobConf jobConf = HDFSAdapterFactory.configureJobConf(configuration); - // Create the lookup reader and the controlled parser - if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_RC)) { - configureRCFile(jobConf, iNullWriterFactory); - } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT) - .equals(AsterixTupleParserFactory.FORMAT_ADM)) { - // create an adm parser - ADMDataParser dataParser = new ADMDataParser(); - if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) { - // Text input format - TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf); - parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput, - inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory); - } else { - // Sequence input format - SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf); - parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput, - inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory); - } - } else if (configuration.get(AsterixTupleParserFactory.KEY_FORMAT) - .equals(AsterixTupleParserFactory.FORMAT_DELIMITED_TEXT)) { - // create a delimited text parser - char delimiter = AsterixTupleParserFactory.getDelimiter(configuration); - char quote = AsterixTupleParserFactory.getQuote(configuration, delimiter); - - DelimitedDataParser dataParser = HDFSIndexingAdapterFactory.getDelimitedDataParser((ARecordType) atype, - delimiter, quote); - if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) { - // Text input format - TextFileLookupInputStream in = new TextFileLookupInputStream(fileIndexAccessor, jobConf); - parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput, - inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory); - } else { - // Sequence input format - SequenceFileLookupInputStream in = new SequenceFileLookupInputStream(fileIndexAccessor, jobConf); - parser = new AdmOrDelimitedControlledTupleParser(ctx, (ARecordType) atype, in, propagateInput, - inRecDesc, dataParser, propagatedFields, ridFields, retainNull, iNullWriterFactory); - } - } else { - configureGenericSeqOrText(jobConf, iNullWriterFactory); - } - } - - private void configureGenericSeqOrText(JobConf jobConf, INullWriterFactory iNullWriterFactory) throws IOException { - if (configuration.get(HDFSAdapterFactory.KEY_INPUT_FORMAT).equals(HDFSAdapterFactory.INPUT_FORMAT_TEXT)) { - // Text input format - TextFileLookupReader reader = new TextFileLookupReader(fileIndexAccessor, jobConf); - parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput, - propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory); - } else { - // Sequence input format - SequenceFileLookupReader reader = new SequenceFileLookupReader(fileIndexAccessor, jobConf); - parser = new SeqOrTxtControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput, - propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory); - } - } - - @Override - public void nextFrame(ByteBuffer buffer, IFrameWriter writer) throws Exception { - parser.parseNext(writer, buffer); - } - - @Override - public void close(IFrameWriter writer) throws Exception { - parser.close(writer); - } - - @Override - public void fail() throws Exception { - // Do nothing - } - - private void configureRCFile(Configuration jobConf, INullWriterFactory iNullWriterFactory) - throws IOException, Exception { - // RCFileLookupReader - RCFileLookupReader reader = new RCFileLookupReader(fileIndexAccessor, - HDFSAdapterFactory.configureJobConf(configuration)); - parser = new RCFileControlledTupleParser(ctx, createRecordParser(jobConf), reader, propagateInput, - propagatedFields, inRecDesc, ridFields, retainNull, iNullWriterFactory); - } - - private IAsterixHDFSRecordParser createRecordParser(Configuration jobConf) throws HyracksDataException { - // Create the record parser - // binary data with a special parser --> create the parser - IAsterixHDFSRecordParser objectParser; - if (configuration.get(HDFSAdapterFactory.KEY_PARSER).equals(HDFSAdapterFactory.PARSER_HIVE)) { - objectParser = new HiveObjectParser(); - } else { - try { - objectParser = (IAsterixHDFSRecordParser) Class - .forName(configuration.get(HDFSAdapterFactory.KEY_PARSER)).newInstance(); - } catch (Exception e) { - throw new HyracksDataException("Unable to create object parser", e); - } - } - // initialize the parser - try { - objectParser.initialize((ARecordType) atype, configuration, jobConf); - } catch (Exception e) { - throw new HyracksDataException("Unable to initialize object parser", e); - } - - return objectParser; - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java deleted file mode 100644 index fab507d..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSLookupAdapterFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.util.Map; - -import org.apache.asterix.external.adapter.factory.IControlledAdapterFactory; -import org.apache.asterix.external.dataset.adapter.IControlledAdapter; -import org.apache.asterix.external.indexing.ExternalFileIndexAccessor; -import org.apache.asterix.om.types.IAType; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; - -// This class takes care of creating the adapter based on the formats and input format -public class HDFSLookupAdapterFactory implements IControlledAdapterFactory { - - private static final long serialVersionUID = 1L; - - private Map adapterConfiguration; - private IAType atype; - private boolean propagateInput; - private int[] ridFields; - private int[] propagatedFields; - private boolean retainNull; - - @Override - public void configure(IAType atype, boolean propagateInput, int[] ridFields, - Map adapterConfiguration, boolean retainNull) { - this.adapterConfiguration = adapterConfiguration; - this.atype = atype; - this.propagateInput = propagateInput; - this.ridFields = ridFields; - this.retainNull = retainNull; - } - - @Override - public IControlledAdapter createAdapter(IHyracksTaskContext ctx, ExternalFileIndexAccessor fileIndexAccessor, - RecordDescriptor inRecDesc) { - if (propagateInput) { - configurePropagatedFields(inRecDesc); - } - return new HDFSLookupAdapter(atype, inRecDesc, adapterConfiguration, propagateInput, ridFields, - propagatedFields, ctx, fileIndexAccessor, retainNull); - } - - private void configurePropagatedFields(RecordDescriptor inRecDesc) { - int ptr = 0; - boolean skip = false; - propagatedFields = new int[inRecDesc.getFieldCount() - ridFields.length]; - for (int i = 0; i < inRecDesc.getFieldCount(); i++) { - if (ptr < ridFields.length) { - skip = false; - for (int j = 0; j < ridFields.length; j++) { - if (ridFields[j] == i) { - ptr++; - skip = true; - break; - } - } - if (!skip) - propagatedFields[i - ptr] = i; - } else { - propagatedFields[i - ptr] = i; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java deleted file mode 100644 index f42a6d1..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParser.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.io.InputStream; - -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.external.indexing.input.AbstractHDFSReader; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.std.file.ITupleParser; - -/* - * This class is used with hdfs objects instead of hdfs - */ -public class HDFSObjectTupleParser implements ITupleParser{ - - private ArrayTupleBuilder tb; - private final FrameTupleAppender appender; - private IAsterixHDFSRecordParser deserializer; - - public HDFSObjectTupleParser(IHyracksCommonContext ctx, ARecordType recType, IAsterixHDFSRecordParser deserializer) - throws HyracksDataException { - appender = new FrameTupleAppender(new VSizeFrame(ctx)); - this.deserializer = deserializer; - tb = new ArrayTupleBuilder(1); - } - - @Override - public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException { - AbstractHDFSReader reader = (AbstractHDFSReader) in; - Object object; - try { - reader.initialize(); - object = reader.readNext(); - while (object!= null) { - tb.reset(); - deserializer.parse(object, tb.getDataOutput()); - tb.addFieldEndOffset(); - addTupleToFrame(writer); - object = reader.readNext(); - } - appender.flush(writer, true); - } catch (AsterixException ae) { - throw new HyracksDataException(ae); - } catch (Exception e) { - throw new HyracksDataException(e); - } - } - - protected void addTupleToFrame(IFrameWriter writer) throws HyracksDataException { - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - appender.flush(writer, true); - if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) { - throw new IllegalStateException(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/284590ed/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java ---------------------------------------------------------------------- diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java deleted file mode 100644 index ac3a92f..0000000 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/dataflow/HDFSObjectTupleParserFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.indexing.dataflow; - -import java.util.Map; - -import org.apache.asterix.external.adapter.factory.HDFSAdapterFactory; -import org.apache.asterix.om.types.ARecordType; -import org.apache.hyracks.api.context.IHyracksCommonContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.dataflow.std.file.ITupleParser; -import org.apache.hyracks.dataflow.std.file.ITupleParserFactory; - -public class HDFSObjectTupleParserFactory implements ITupleParserFactory{ - private static final long serialVersionUID = 1L; - // parser class name in case of binary format - private String parserClassName; - // the expected data type - private ARecordType atype; - // the hadoop job conf - private HDFSAdapterFactory adapterFactory; - // adapter arguments - private Map arguments; - - public HDFSObjectTupleParserFactory(ARecordType atype, HDFSAdapterFactory adapterFactory, Map arguments){ - this.parserClassName = (String) arguments.get(HDFSAdapterFactory.KEY_PARSER); - this.atype = atype; - this.arguments = arguments; - this.adapterFactory = adapterFactory; - } - - @Override - public ITupleParser createTupleParser(IHyracksCommonContext ctx) throws HyracksDataException { - IAsterixHDFSRecordParser objectParser; - if (parserClassName.equals(HDFSAdapterFactory.PARSER_HIVE)) { - objectParser = new HiveObjectParser(); - } else { - try { - objectParser = (IAsterixHDFSRecordParser) Class.forName(parserClassName).newInstance(); - } catch (Exception e) { - throw new HyracksDataException("Unable to create object parser", e); - } - } - try { - objectParser.initialize(atype, arguments, adapterFactory.getJobConf()); - } catch (Exception e) { - throw new HyracksDataException("Unable to initialize object parser", e); - } - - return new HDFSObjectTupleParser(ctx, atype, objectParser); - } - -}