carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [1/2] incubator-carbondata git commit: clean hadoop
Date Mon, 09 Jan 2017 11:40:30 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 45211a4c5 -> 4703a9a86


clean hadoop


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/18cfba80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/18cfba80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/18cfba80

Branch: refs/heads/master
Commit: 18cfba808a70b6e972381d300c90e77c78023a79
Parents: 45211a4
Author: jackylk <jacky.likun@huawei.com>
Authored: Mon Jan 9 18:13:57 2017 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Mon Jan 9 19:39:35 2017 +0800

----------------------------------------------------------------------
 .../carbondata/hadoop/CacheAccessClient.java    |  2 +-
 .../apache/carbondata/hadoop/CacheClient.java   |  5 +-
 .../carbondata/hadoop/CarbonInputFormat.java    | 35 +++--------
 .../carbondata/hadoop/CarbonInputSplit.java     |  3 +
 .../hadoop/CarbonRawDataInputSplit.java         | 63 --------------------
 .../recorditerator/RecordReaderIterator.java    |  2 +-
 .../hadoop/util/CarbonInputFormatUtil.java      |  7 ---
 7 files changed, 16 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
index 984930d..0728400 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheAccessClient.java
@@ -36,7 +36,7 @@ public class CacheAccessClient<K, V> {
 
   private Cache<K, V> cache;
 
-  public CacheAccessClient(Cache cache) {
+  public CacheAccessClient(Cache<K, V> cache) {
     this.cache = cache;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
index 1982e2c..0dd54ee 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CacheClient.java
@@ -34,9 +34,8 @@ public class CacheClient {
       segmentAccessClient;
 
   public CacheClient(String storePath) {
-    Cache segmentCache = CacheProvider
-        .getInstance().<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper>createCache(
-            CacheType.DRIVER_BTREE, storePath);
+    Cache<TableSegmentUniqueIdentifier, SegmentTaskIndexWrapper> segmentCache =
+        CacheProvider.getInstance().createCache(CacheType.DRIVER_BTREE, storePath);
     segmentAccessClient = new CacheAccessClient<>(segmentCache);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 87f2e0a..5dba2da 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.scan.model.QueryModel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.InvalidPathException;
@@ -121,8 +120,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
-  public static void setTablePath(Configuration configuration, String tablePath)
-      throws IOException {
+  public static void setTablePath(Configuration configuration, String tablePath) {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
@@ -169,7 +167,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
   }
 
-  public static CarbonTablePath getTablePath(Configuration configuration) throws IOException
{
+  public static CarbonTablePath getTablePath(Configuration configuration) {
     AbsoluteTableIdentifier absIdentifier = getAbsoluteTableIdentifier(configuration);
     return CarbonStorePath.getCarbonTablePath(absIdentifier);
   }
@@ -241,6 +239,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     }
     CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
     FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+
     // do block filtering and get split
     List<InputSplit> splits = getSplits(job, filterInterface, cacheClient);
     cacheClient.close();
@@ -376,11 +375,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
    */
   private List<TableBlockInfo> getTableBlockInfo(JobContext job,
       TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
-      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys,
-      List<String> updatedTaskList,
-      UpdateVO updateDetails,
-      SegmentUpdateStatusManager updateStatusManager,
-      String segmentId)
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, UpdateVO updateDetails,
+      SegmentUpdateStatusManager updateStatusManager, String segmentId)
     throws IOException {
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
 
@@ -450,19 +446,16 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
    * @param segmentId
    * @return
    * @throws IOException
-   * @throws IndexBuilderException
    */
   private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
       JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId,
       CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException
{
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
     SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-    List<String> updatedTaskList = null;
     boolean isSegmentUpdated = false;
     Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
     TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
         new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
-    SegmentStatusManager statusManager = new SegmentStatusManager(absoluteTableIdentifier);
     segmentTaskIndexWrapper =
         cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
     UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
@@ -471,8 +464,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
         taskKeys = segmentIndexMap.keySet();
         isSegmentUpdated = true;
-        updatedTaskList =
-            statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
       }
     }
     // if segment tree is not loaded, load the segment tree
@@ -481,7 +472,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       // retrieved. the same will be filtered based on taskKeys , if the task is same
       // for the block then dont add it since already its btree is loaded.
       List<TableBlockInfo> tableBlockInfoList =
-          getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList,
+          getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys,
               updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager,
               segmentId);
       if (!tableBlockInfoList.isEmpty()) {
@@ -586,7 +577,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
-    CarbonReadSupport readSupport = getReadSupportClass(configuration);
+    CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
     return new CarbonRecordReader<T>(queryModel, readSupport);
   }
 
@@ -623,7 +614,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     return queryModel;
   }
 
-  public CarbonReadSupport getReadSupportClass(Configuration configuration) {
+  public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
     String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
     //By default it uses dictionary decoder read class
     CarbonReadSupport readSupport = null;
@@ -646,14 +637,6 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
     return readSupport;
   }
 
-  @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
-    return super.computeSplitSize(blockSize, minSize, maxSize);
-  }
-
-  @Override protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
-    return super.getBlockIndex(blkLocations, offset);
-  }
-
   @Override protected List<FileStatus> listStatus(JobContext job) throws IOException
{
     List<FileStatus> result = new ArrayList<FileStatus>();
     String[] segmentsToConsider = getSegmentsToAccess(job);
@@ -736,7 +719,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   /**
    * return valid segment to access
    */
-  private String[] getSegmentsToAccess(JobContext job) throws IOException {
+  private String[] getSegmentsToAccess(JobContext job) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
     if (segmentString.trim().isEmpty()) {
       return new String[0];

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 16a8ef3..af24418 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -215,6 +215,9 @@ public class CarbonInputSplit extends FileSplit
   }
 
   @Override public int compareTo(Distributable o) {
+    if (o == null) {
+      return -1;
+    }
     CarbonInputSplit other = (CarbonInputSplit) o;
     int compareResult = 0;
     // get the segment id

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRawDataInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRawDataInputSplit.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRawDataInputSplit.java
deleted file mode 100644
index 2c2e32d..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRawDataInputSplit.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-/**
- * Handles input splits for raw data
- */
-public class CarbonRawDataInputSplit extends InputSplit implements Writable {
-
-  long length;
-  String[] locations;
-
-  public CarbonRawDataInputSplit(long length, String[] locations) {
-    this.length = length;
-    this.locations = locations;
-  }
-
-  public static CarbonRawDataInputSplit from(FileSplit split) throws IOException {
-    return new CarbonRawDataInputSplit(split.getLength(), split.getLocations());
-  }
-
-  @Override public long getLength() throws IOException, InterruptedException {
-    return length;
-  }
-
-  @Override public String[] getLocations() throws IOException, InterruptedException {
-    return locations;
-  }
-
-  @Override public void write(DataOutput out) throws IOException {
-    out.writeLong(length);
-    out.writeLong(locations.length);
-  }
-
-  @Override public void readFields(DataInput in) throws IOException {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
index a1bc1e9..60aa6c3 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
@@ -60,7 +60,7 @@ public class RecordReaderIterator extends CarbonIterator<Object []>
{
         isConsumed = recordReader.nextKeyValue();
         return isConsumed;
       }
-      return isConsumed;
+      return true;
     } catch (Exception e) {
       throw new CarbonDataLoadingException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/18cfba80/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 9100268..3c2eaf6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -123,11 +123,4 @@ public class CarbonInputFormatUtil {
       throw new RuntimeException("Error while resolving filter expression", e);
     }
   }
-
-  public static String processPath(String path) {
-    if (path != null && path.startsWith("file:")) {
-      return path.substring(5, path.length());
-    }
-    return path;
-  }
 }


Mime
View raw message