carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [15/15] carbondata git commit: Rebased with master
Date Mon, 17 Jul 2017 01:57:01 GMT
Rebased with master


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f4ab1ff6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f4ab1ff6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f4ab1ff6

Branch: refs/heads/datamap
Commit: f4ab1ff69d51709b250efbd8bc538f6231994f54
Parents: 2015a3e
Author: Ravindra Pesala <ravi.pesala@gmail.com>
Authored: Thu Jul 13 14:18:02 2017 +0530
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Mon Jul 17 07:26:27 2017 +0530

----------------------------------------------------------------------
 .../core/indexstore/UnsafeMemoryDMStore.java    | 27 +++----
 .../blockletindex/BlockletDataMap.java          | 11 +--
 .../core/memory/UnsafeMemoryManager.java        | 16 ++--
 .../hadoop/api/CarbonTableInputFormat.java      | 81 ++++++++++----------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  2 +-
 5 files changed, 67 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f4ab1ff6/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 8246f99..737586e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -19,9 +19,9 @@ package org.apache.carbondata.core.indexstore;
 import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 
 import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET;
 import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe;
@@ -39,8 +39,6 @@ public class UnsafeMemoryDMStore {
 
   private int runningLength;
 
-  private MemoryAllocator memoryAllocator;
-
   private boolean isMemoryFreed;
 
   private DataMapSchema[] schema;
@@ -49,11 +47,10 @@ public class UnsafeMemoryDMStore {
 
   private int rowCount;
 
-  public UnsafeMemoryDMStore(DataMapSchema[] schema) {
+  public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException {
     this.schema = schema;
-    this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator();
     this.allocatedSize = capacity;
-    this.memoryBlock = memoryAllocator.allocate(allocatedSize);
+    this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize);
     this.pointers = new int[1000];
   }
 
@@ -63,13 +60,13 @@ public class UnsafeMemoryDMStore {
    *
    * @param rowSize
    */
-  private void ensureSize(int rowSize) {
+  private void ensureSize(int rowSize) throws MemoryException {
     if (runningLength + rowSize >= allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize +
capacity);
+          UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize + capacity);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       allocatedSize = allocatedSize + capacity;
       memoryBlock = allocate;
     }
@@ -86,7 +83,7 @@ public class UnsafeMemoryDMStore {
    * @param indexRow
    * @return
    */
-  public void addIndexRowToUnsafe(DataMapRow indexRow) {
+  public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException {
     // First calculate the required memory to keep the row in unsafe
     int rowSize = indexRow.getTotalSizeInBytes();
     // Check whether allocated memory is sufficient or not.
@@ -168,13 +165,13 @@ public class UnsafeMemoryDMStore {
     return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]);
   }
 
-  public void finishWriting() {
+  public void finishWriting() throws MemoryException {
     if (runningLength < allocatedSize) {
       MemoryBlock allocate =
-          MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength);
+          UnsafeMemoryManager.allocateMemoryWithRetry(runningLength);
       unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(),
           allocate.getBaseObject(), allocate.getBaseOffset(), runningLength);
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       memoryBlock = allocate;
     }
     // Compact pointers.
@@ -187,7 +184,7 @@ public class UnsafeMemoryDMStore {
 
   public void freeMemory() {
     if (!isMemoryFreed) {
-      memoryAllocator.free(memoryBlock);
+      UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock);
       isMemoryFreed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f4ab1ff6/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 79aa091..680852d 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.indexstore.row.DataMapRowImpl;
 import org.apache.carbondata.core.indexstore.schema.DataMapSchema;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
@@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
       if (unsafeMemoryDMStore != null) {
         unsafeMemoryDMStore.finishWriting();
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
@@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable {
         DataOutput dataOutput = new DataOutputStream(stream);
         blockletInfo.write(dataOutput);
         serializedData = stream.toByteArray();
-      } catch (IOException e) {
+        row.setByteArray(serializedData, ordinal);
+        unsafeMemoryDMStore.addIndexRowToUnsafe(row);
+      } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      row.setByteArray(serializedData, ordinal);
-      unsafeMemoryDMStore.addIndexRowToUnsafe(row);
     }
   }
 
@@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     return minRow;
   }
 
-  private void createSchema(SegmentProperties segmentProperties) {
+  private void createSchema(SegmentProperties segmentProperties) throws MemoryException {
     List<DataMapSchema> indexSchemas = new ArrayList<>();
 
     // Index key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f4ab1ff6/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 28e63a9..c491908 100644
--- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -101,11 +101,9 @@ public class UnsafeMemoryManager {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
-      if (LOGGER.isDebugEnabled()) {
-        set.add(allocate);
-        LOGGER.error("Memory block (" + allocate + ") is created with size "  + allocate.size()
+
-            ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() +
"Bytes");
-      }
+      set.add(allocate);
+      LOGGER.info("Memory block (" + allocate + ") is created with size "  + allocate.size()
+
+          ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes");
       return allocate;
     }
     return null;
@@ -115,11 +113,9 @@ public class UnsafeMemoryManager {
     allocator.free(memoryBlock);
     memoryUsed -= memoryBlock.size();
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-    if (LOGGER.isDebugEnabled()) {
-      set.remove(memoryBlock);
-      LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed
+
-          "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
-    }
+    set.remove(memoryBlock);
+    LOGGER.info("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed
+
+        "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size());
   }
 
   private synchronized long getAvailableMemory() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f4ab1ff6/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index e73c04a..8938699 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.UpdateVO;
@@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   private static final String FILTER_PREDICATE =
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
-  private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table";
+  private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
 
+  // a cache for carbon table, it will be used in task side
+  private CarbonTable carbonTable;
+
   /**
-   * It is optional, if user does not set then it reads from store
-   *
-   * @param configuration
-   * @param carbonTable
-   * @throws IOException
+   * Set the `tableInfo` in `configuration`
    */
-  public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable)
+  public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
       throws IOException {
-    if (null != carbonTable) {
-      configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable));
+    if (null != tableInfo) {
+      configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo));
     }
   }
 
-  public static CarbonTable getCarbonTable(Configuration configuration) throws IOException
{
-    String carbonTableStr = configuration.get(CARBON_TABLE);
-    if (carbonTableStr == null) {
-      populateCarbonTable(configuration);
-      // read it from schema file in the store
-      carbonTableStr = configuration.get(CARBON_TABLE);
-      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
-    }
-    return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
+  /**
+   * Get TableInfo object from `configuration`
+   */
+  private TableInfo getTableInfo(Configuration configuration) throws IOException {
+    String tableInfoStr = configuration.get(TABLE_INFO);
+    return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr);
   }
 
   /**
-   * this method will read the schema from the physical file and populate into CARBON_TABLE
-   *
-   * @param configuration
-   * @throws IOException
+   * Get the cached CarbonTable or create it by TableInfo in `configuration`
    */
-  private static void populateCarbonTable(Configuration configuration) throws IOException
{
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
+  private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException
{
+    if (carbonTable == null) {
+      // carbon table should be created either from deserialized table info (schema saved
in
+      // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+      TableInfo tableInfo = getTableInfo(configuration);
+      CarbonTable carbonTable;
+      if (tableInfo != null) {
+        carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+      } else {
+        carbonTable = SchemaReader.readCarbonTableFromStore(
+            getAbsoluteTableIdentifier(configuration));
+      }
+      this.carbonTable = carbonTable;
+      return carbonTable;
+    } else {
+      return this.carbonTable;
     }
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
-    // read the schema file to get the absoluteTableIdentifier having the correct table id
-    // persisted in the schema
-    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
-    setCarbonTable(configuration, carbonTable);
   }
 
   public static void setTablePath(Configuration configuration, String tablePath)
       throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
-
   /**
    * It sets unresolved filter expression.
    *
@@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles));
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+  private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
       throws IOException {
-    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
 
     // process and resolve the expression
     Expression filter = getFilterPredicates(job.getConfiguration());
-    CarbonTable carbonTable = getCarbonTable(job.getConfiguration());
+    CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     // this will be null in case of corrupt schema file.
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
@@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
     Boolean isIUDTable = false;
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
+        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(absoluteTableIdentifier);
 
@@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void,
T> {
   public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
-    CarbonTable carbonTable = getCarbonTable(configuration);
+    CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
     // getting the table absoluteTableIdentifier from the carbonTable
     // to avoid unnecessary deserialization
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f4ab1ff6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 6bc7564..2e737ab 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    CarbonInputFormat.setTableInfo(job.getConfiguration,
+    CarbonTableInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
     // initialise query_id for job


Mime
View raw message