drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-3963) Read raw key value bytes from sequence files
Date Thu, 29 Oct 2015 23:00:29 GMT

    [ https://issues.apache.org/jira/browse/DRILL-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14981520#comment-14981520
] 

ASF GitHub Bot commented on DRILL-3963:
---------------------------------------

Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/214#discussion_r43455344
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
---
    @@ -0,0 +1,167 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.easy.sequencefile;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.ArrayList;
    +import java.util.concurrent.TimeUnit;
    +import java.security.PrivilegedExceptionAction;
    +
    +import com.google.common.base.Stopwatch;
    +import org.apache.drill.common.exceptions.DrillRuntimeException;
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.store.AbstractRecordReader;
    +import org.apache.drill.common.types.TypeProtos.MajorType;
    +import org.apache.drill.exec.store.dfs.DrillFileSystem;
    +import org.apache.drill.exec.util.ImpersonationUtil;
    +import org.apache.drill.exec.vector.NullableVarBinaryVector;
    +import org.apache.hadoop.io.BytesWritable;
    +import org.apache.hadoop.mapred.InputFormat;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hadoop.mapred.Reporter;
    +import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +
    +public class SequenceFileRecordReader extends AbstractRecordReader {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SequenceFileRecordReader.class);
    +
    +  private static final int PER_BATCH_RECORD_COUNT = 4096;
    +  private static final int PER_BATCH_BYTES = 256*1024;
    +
    +  private static final MajorType KEY_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY);
    +  private static final MajorType VALUE_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY);
    +
    +  private final SchemaPath keySchema = SchemaPath.getSimplePath("binary_key");
    +  private final SchemaPath valueSchema = SchemaPath.getSimplePath("binary_value");
    +
    +  private NullableVarBinaryVector keyVector;
    +  private NullableVarBinaryVector valueVector;
    +  private final FileSplit split;
    +  private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> reader;
    +  private final BytesWritable key = new BytesWritable();
    +  private final BytesWritable value = new BytesWritable();
    +  private final DrillFileSystem dfs;
    +  private final String queryUserName;
    +  private final String opUserName;
    +
    +  public SequenceFileRecordReader(final FileSplit split,
    +                                  final DrillFileSystem dfs,
    +                                  final String queryUserName,
    +                                  final String opUserName) {
    +    final List<SchemaPath> columns = new ArrayList();
    +    columns.add(keySchema);
    +    columns.add(valueSchema);
    +    setColumns(columns);
    +    this.dfs = dfs;
    +    this.split = split;
    +    this.queryUserName = queryUserName;
    +    this.opUserName = opUserName;
    +  }
    +
    +  @Override
    +  protected boolean isSkipQuery() {
    +    return false;
    +  }
    +
    +  private org.apache.hadoop.mapred.RecordReader getRecordReader(final InputFormat inputFormat,
    +                                                                final JobConf jobConf)
throws ExecutionSetupException {
    +    try {
    +      final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName,
this.queryUserName);
    +      return ugi.doAs(new PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader>()
{
    +        @Override
    +        public org.apache.hadoop.mapred.RecordReader run() throws Exception {
    +          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
    +        }
    +      });
    +    } catch (IOException | InterruptedException e) {
    +      throw new ExecutionSetupException(
    +        String.format("Error in creating sequencefile reader for file: %s, start: %d,
length: %d",
    +          split.getPath(), split.getStart(), split.getLength()), e);
    +    }
    +  }
    +
    +  @Override
    +  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException
{
    +    final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
    +    final JobConf jobConf = new JobConf(dfs.getConf());
    +    jobConf.setInputFormat(inputFormat.getClass());
    +    this.reader = getRecordReader(inputFormat, jobConf);
    --- End diff --
    
    unchecked assignment and unnecessary "this"?


> Read raw key value bytes from sequence files
> --------------------------------------------
>
>                 Key: DRILL-3963
>                 URL: https://issues.apache.org/jira/browse/DRILL-3963
>             Project: Apache Drill
>          Issue Type: New Feature
>            Reporter: amit hadke
>            Assignee: amit hadke
>
> Sequence files store list of key-value pairs. Keys/values are of type hadoop writable.
> Provide a format plugin that reads raw bytes out of sequence files which can be further
deserialized by a udf(from hadoop writable -> drill type)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message