drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-4081) Handle schema changes in ExternalSort
Date Mon, 16 Nov 2015 02:10:11 GMT

    [ https://issues.apache.org/jira/browse/DRILL-4081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15006150#comment-15006150
] 

ASF GitHub Bot commented on DRILL-4081:
---------------------------------------

Github user jacques-n commented on a diff in the pull request:

    https://github.com/apache/drill/pull/257#discussion_r44883581
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java ---
    @@ -0,0 +1,159 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.record;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos.DataMode;
    +import org.apache.drill.common.types.TypeProtos.MajorType;
    +import org.apache.drill.common.types.TypeProtos.MinorType;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.OperatorContext;
    +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
    +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
    +import org.apache.drill.exec.vector.ValueVector;
    +import org.apache.drill.exec.vector.complex.UnionVector;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Utility class for dealing with changing schemas
    + */
    +public class SchemaUtil {
    +
    +  /**
    +   * Returns the merger of schemas. The merged schema will include the union all columns.
If there is a type conflict
    +   * between columns with the same schemapath but different types, the merged schema
will contain a Union type.
    +   * @param schemas
    +   * @return
    +   */
    +  public static BatchSchema mergeSchemas(BatchSchema... schemas) {
    +    Map<SchemaPath,Set<MinorType>> typeSetMap = Maps.newLinkedHashMap();
    +
    +    for (BatchSchema s : schemas) {
    +      for (MaterializedField field : s) {
    +        SchemaPath path = field.getPath();
    +        Set<MinorType> currentTypes = typeSetMap.get(path);
    +        if (currentTypes == null) {
    +          currentTypes = Sets.newHashSet();
    +          typeSetMap.put(path, currentTypes);
    +        }
    +        MinorType newType = field.getType().getMinorType();
    +        if (newType == MinorType.UNION) {
    +          for (MinorType subType : field.getType().getSubTypeList()) {
    +            currentTypes.add(subType);
    +          }
    +        } else {
    +          currentTypes.add(newType);
    +        }
    +      }
    +    }
    +
    +    List<MaterializedField> fields = Lists.newArrayList();
    +
    +    for (SchemaPath path : typeSetMap.keySet()) {
    +      Set<MinorType> types = typeSetMap.get(path);
    +      if (types.size() > 1) {
    +        MajorType.Builder builder = MajorType.newBuilder().setMinorType(MinorType.UNION).setMode(DataMode.OPTIONAL);
    +        for (MinorType t : types) {
    +          builder.addSubType(t);
    +        }
    +        MaterializedField field = MaterializedField.create(path, builder.build());
    +        fields.add(field);
    +      } else {
    +        MaterializedField field = MaterializedField.create(path, Types.optional(types.iterator().next()));
    +        fields.add(field);
    +      }
    +    }
    +
    +    SchemaBuilder schemaBuilder = new SchemaBuilder();
    +    BatchSchema s = schemaBuilder.addFields(fields).setSelectionVectorMode(schemas[0].getSelectionVectorMode()).build();
    +    return s;
    +  }
    +
    +  /**
    +   * Creates a copy a record batch, converting any fields as necessary to coerce it into
the provided schema
    +   * @param in
    +   * @param toSchema
    +   * @param context
    +   * @return
    +   */
    +  public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema,
OperatorContext context) {
    +    int recordCount = in.getRecordCount();
    +    Map<SchemaPath,ValueVector> vectorMap = Maps.newHashMap();
    +    for (VectorWrapper w : in) {
    +      ValueVector v = w.getValueVector();
    +      vectorMap.put(v.getField().getPath(), v);
    +    }
    +
    +    VectorContainer c = new VectorContainer(context);
    +
    +    for (MaterializedField field : toSchema) {
    +      ValueVector v = vectorMap.get(field.getPath());
    +      if (v != null) {
    +        int valueCount = v.getAccessor().getValueCount();
    +        TransferPair tp = v.getTransferPair();
    +        tp.transfer();
    +        if (v.getField().getType().getMinorType().equals(field.getType().getMinorType()))
{
    --- End diff --
    
    Again I'm wondering about what happens in the case of a map type here with different children.


> Handle schema changes in ExternalSort
> -------------------------------------
>
>                 Key: DRILL-4081
>                 URL: https://issues.apache.org/jira/browse/DRILL-4081
>             Project: Apache Drill
>          Issue Type: Improvement
>            Reporter: Steven Phillips
>            Assignee: Steven Phillips
>
> This improvement will make use of the Union vector to handle schema changes. When a new
schema appears, the schema will be "merged" with the previous schema. The result will be a
new schema that uses Union type to store the columns where this is a type conflict. All of
the batches (including the batches that have already arrived) will be coerced into this new
schema.
> A new comparison function will be included to handle the comparison of Union type. Comparison
of union type will work as follows:
> 1. All numeric types can be mutually compared, and will be compared using Drill implicit
cast rules.
> 2. All other types will not be compared against other types, but only among values of
the same type.
> 3. There will be an overall precedence of types with regards to ordering. This precedence
is not yet defined, but will be as part of the work on this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message