carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [37/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module
Date Tue, 10 Oct 2017 03:08:24 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
new file mode 100644
index 0000000..e6db5e2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files.  Files are broken into lines.
+ * Values are the line of csv files.
+ */
+public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
+
+  public static final String DELIMITER = "carbon.csvinputformat.delimiter";
+  public static final String DELIMITER_DEFAULT = ",";
+  public static final String COMMENT = "carbon.csvinputformat.comment";
+  public static final String COMMENT_DEFAULT = "#";
+  public static final String QUOTE = "carbon.csvinputformat.quote";
+  public static final String QUOTE_DEFAULT = "\"";
+  public static final String ESCAPE = "carbon.csvinputformat.escape";
+  public static final String ESCAPE_DEFAULT = "\\";
+  public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
+  public static final boolean HEADER_PRESENT_DEFAULT = false;
+  public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+  public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
+  public static final String NUMBER_OF_COLUMNS = "carbon.csvinputformat.number.of.columns";
+  public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000;
+  public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000;
+
+  private static LogService LOGGER =
+      LogServiceFactory.getLogService(CSVInputFormat.class.toString());
+
+
+  @Override
+  public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CSVRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
+        .getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
+  }
+
+  /**
+   * Sets the comment char to configuration. Default it is #.
+   * @param configuration
+   * @param commentChar
+   */
+  public static void setCommentCharacter(Configuration configuration, String commentChar) {
+    if (commentChar != null && !commentChar.isEmpty()) {
+      configuration.set(COMMENT, commentChar);
+    }
+  }
+
+  /**
+   * Sets the delimiter to configuration. Default it is ','
+   * @param configuration
+   * @param delimiter
+   */
+  public static void setCSVDelimiter(Configuration configuration, String delimiter) {
+    if (delimiter != null && !delimiter.isEmpty()) {
+      configuration.set(DELIMITER, delimiter);
+    }
+  }
+
+  /**
+   * Sets the escape character to configuration. Default it is \
+   * @param configuration
+   * @param escapeCharacter
+   */
+  public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
+    if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
+      configuration.set(ESCAPE, escapeCharacter);
+    }
+  }
+
+  /**
+   * Whether header needs to read from csv or not. By default it is false.
+   * @param configuration
+   * @param headerExtractEnable
+   */
+  public static void setHeaderExtractionEnabled(Configuration configuration,
+      boolean headerExtractEnable) {
+    configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
+  }
+
+  /**
+   * Sets the quote character to configuration. Default it is "
+   * @param configuration
+   * @param quoteCharacter
+   */
+  public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
+    if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
+      configuration.set(QUOTE, quoteCharacter);
+    }
+  }
+
+  /**
+   * Sets the read buffer size to configuration.
+   * @param configuration
+   * @param bufferSize
+   */
+  public static void setReadBufferSize(Configuration configuration, String bufferSize) {
+    if (bufferSize != null && !bufferSize.isEmpty()) {
+      configuration.set(READ_BUFFER_SIZE, bufferSize);
+    }
+  }
+
+  public static void setMaxColumns(Configuration configuration, String maxColumns) {
+    if (maxColumns != null) {
+      configuration.set(MAX_COLUMNS, maxColumns);
+    }
+  }
+
+  public static void setNumberOfColumns(Configuration configuration, String numberOfColumns) {
+    configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
+  }
+
+  /**
+   * Treats value as line in file. Key is null.
+   */
+  public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
+
+    private long start;
+    private long end;
+    private BoundedInputStream boundedInputStream;
+    private Reader reader;
+    private CsvParser csvParser;
+    private StringArrayWritable value;
+    private String[] columns;
+    private Seekable filePosition;
+    private boolean isCompressedInput;
+    private Decompressor decompressor;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      FileSplit split = (FileSplit) inputSplit;
+      start = split.getStart();
+      end = start + split.getLength();
+      Path file = split.getPath();
+      Configuration job = context.getConfiguration();
+      CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
+      FileSystem fs = file.getFileSystem(job);
+      int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
+      FSDataInputStream fileIn = fs.open(file, bufferSize);
+      InputStream inputStream;
+      if (codec != null) {
+        isCompressedInput = true;
+        decompressor = CodecPool.getDecompressor(codec);
+        if (codec instanceof SplittableCompressionCodec) {
+          SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
+              .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
+                  .READ_MODE.BYBLOCK);
+          start = scIn.getAdjustedStart();
+          end = scIn.getAdjustedEnd();
+          if (start != 0) {
+            LineReader lineReader = new LineReader(scIn, 1);
+            start += lineReader.readLine(new Text(), 0);
+          }
+          filePosition = scIn;
+          inputStream = scIn;
+        } else {
+          CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
+          filePosition = cIn;
+          inputStream = cIn;
+        }
+      } else {
+        fileIn.seek(start);
+        if (start != 0) {
+          LineReader lineReader = new LineReader(fileIn, 1);
+          start += lineReader.readLine(new Text(), 0);
+        }
+        boundedInputStream = new BoundedInputStream(fileIn, end - start);
+        filePosition = fileIn;
+        inputStream = boundedInputStream;
+      }
+      reader = new InputStreamReader(inputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      csvParser = new CsvParser(extractCsvParserSettings(job));
+      csvParser.beginParsing(reader);
+    }
+
+    private CsvParserSettings extractCsvParserSettings(Configuration job) {
+      CsvParserSettings parserSettings = new CsvParserSettings();
+      parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
+      parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
+      parserSettings.setLineSeparatorDetectionEnabled(true);
+      parserSettings.setNullValue("");
+      parserSettings.setEmptyValue("");
+      parserSettings.setIgnoreLeadingWhitespaces(false);
+      parserSettings.setIgnoreTrailingWhitespaces(false);
+      parserSettings.setSkipEmptyLines(false);
+      parserSettings.setMaxCharsPerColumn(100000);
+      String maxColumns = job.get(MAX_COLUMNS);
+      parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
+      parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
+      parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
+      if (start == 0) {
+        parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
+            HEADER_PRESENT_DEFAULT));
+      }
+      return parserSettings;
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (csvParser == null) {
+        return false;
+      }
+      columns = csvParser.parseNext();
+      if (columns == null) {
+        value = null;
+        return false;
+      }
+      if (value == null) {
+        value = new StringArrayWritable();
+      }
+      value.set(columns);
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    private long getPos() throws IOException {
+      long retVal = start;
+      if (null != boundedInputStream) {
+        retVal = end - boundedInputStream.getRemaining();
+      } else if (isCompressedInput && null != filePosition) {
+        retVal = filePosition.getPos();
+      }
+      return retVal;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
+          start) / (float) (end - start));
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (reader != null) {
+          reader.close();
+        }
+        if (boundedInputStream != null) {
+          boundedInputStream.close();
+        }
+        if (null != csvParser) {
+          csvParser.stopParsing();
+        }
+      } finally {
+        reader = null;
+        boundedInputStream = null;
+        csvParser = null;
+        filePosition = null;
+        value = null;
+        if (decompressor != null) {
+          CodecPool.returnDecompressor(decompressor);
+          decompressor = null;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVRecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVRecordReaderIterator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVRecordReaderIterator.java
new file mode 100644
index 0000000..24ef9c1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVRecordReaderIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.IOException;
+
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import com.univocity.parsers.common.TextParsingException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * It is wrapper iterator around @{@link RecordReader}.
+ */
+public class CSVRecordReaderIterator extends CarbonIterator<Object []> {
+
+  private RecordReader<NullWritable, StringArrayWritable> recordReader;
+
+  /**
+   * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
+   * multiple times on record reader as it moves another line. To avoid that situation like hasNext
+   * only tells whether next row is present or not and next will move the pointer to next row after
+   * consuming it.
+   */
+  private boolean isConsumed;
+
+  private InputSplit split;
+
+  private TaskAttemptContext context;
+
+  public CSVRecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
+      InputSplit split, TaskAttemptContext context) {
+    this.recordReader = recordReader;
+    this.split = split;
+    this.context = context;
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (!isConsumed) {
+        isConsumed = recordReader.nextKeyValue();
+        return isConsumed;
+      }
+      return true;
+    } catch (Exception e) {
+      if (e instanceof TextParsingException) {
+        throw new CarbonDataLoadingException(
+            CarbonDataProcessorUtil.trimErrorMessage(e.getMessage()));
+      }
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      String[] data = recordReader.getCurrentValue().get();
+      isConsumed = false;
+      return data;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public void initialize() {
+    try {
+      recordReader.initialize(split, context);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      recordReader.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/StringArrayWritable.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/StringArrayWritable.java
new file mode 100644
index 0000000..fe4a7ed
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/StringArrayWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A String sequence that is usable as a key or value.
+ */
+public class StringArrayWritable implements Writable {
+  private String[] values;
+
+  public String[] toStrings() {
+    return values;
+  }
+
+  public void set(String[] values) {
+    this.values = values;
+  }
+
+  public String[] get() {
+    return values;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int length = in.readInt();
+    values = new String[length];
+    for (int i = 0; i < length; i++) {
+      byte[] b = new byte[in.readInt()];
+      in.readFully(b);
+      values[i] = new String(b, Charset.defaultCharset());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(values.length);                 // write values
+    for (int i = 0; i < values.length; i++) {
+      byte[] b = values[i].getBytes(Charset.defaultCharset());
+      out.writeInt(b.length);
+      out.write(b);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DictionaryServerClientDictionary.java
new file mode 100644
index 0000000..25426f6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DictionaryServerClientDictionary.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.dictionary;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+
+/**
+ * Dictionary implementation along with dictionary server client to get new dictionary values
+ */
+public class DictionaryServerClientDictionary implements BiDictionary<Integer, Object> {
+
+  private Dictionary dictionary;
+
+  private DictionaryClient client;
+
+  private Map<Object, Integer> localCache;
+
+  private DictionaryMessage dictionaryMessage;
+
+  private int base;
+
+  public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
+      DictionaryMessage key, Map<Object, Integer> localCache) {
+    this.dictionary = dictionary;
+    this.client = client;
+    this.dictionaryMessage = key;
+    this.localCache = localCache;
+    this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
+  }
+
+  @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+    Integer key = getKey(value);
+    if (key == null) {
+      dictionaryMessage.setData(value.toString());
+      DictionaryMessage dictionaryValue = client.getDictionary(dictionaryMessage);
+      key = dictionaryValue.getDictionaryValue();
+      synchronized (localCache) {
+        localCache.put(value, key);
+      }
+      return key + base;
+    }
+    return key;
+  }
+
+  @Override public Integer getKey(Object value) {
+    Integer key = -1;
+    if (dictionary != null) {
+      key = dictionary.getSurrogateKey(value.toString());
+    }
+    if (key == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+      key = localCache.get(value);
+      if (key != null) {
+        return key + base;
+      }
+    }
+    return key;
+  }
+
+  @Override public Object getValue(Integer key) {
+    throw new UnsupportedOperationException("Not supported here");
+  }
+
+  @Override public int size() {
+    dictionaryMessage.setType(DictionaryMessageType.SIZE);
+    return client.getDictionary(dictionaryMessage).getDictionaryValue() + base;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
new file mode 100644
index 0000000..165e5a4
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/DirectDictionary.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.dictionary;
+
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+
+/**
+ * It is used for generating dictionary from value itself, like timestamp can be used directly as
+ * dictionary.
+ */
+public class DirectDictionary implements BiDictionary<Integer, Object> {
+
+  private DirectDictionaryGenerator dictionaryGenerator;
+
+  public DirectDictionary(DirectDictionaryGenerator dictionaryGenerator) {
+    this.dictionaryGenerator = dictionaryGenerator;
+  }
+
+  @Override
+  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+    Integer key = getKey(value);
+    if (key == null) {
+      throw new UnsupportedOperationException("trying to add new entry in DirectDictionary");
+    }
+    return key;
+  }
+
+  @Override
+  public Integer getKey(Object value) {
+    return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
+  }
+
+  @Override
+  public Object getValue(Integer key) {
+    return dictionaryGenerator.getValueFromSurrogate(key);
+  }
+
+  @Override public int size() {
+    return Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
new file mode 100644
index 0000000..609fec6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/InMemBiDictionary.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.dictionary;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.devapi.DictionaryGenerator;
+import org.apache.carbondata.core.devapi.GeneratingBiDictionary;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+
+public class InMemBiDictionary<K, V> extends GeneratingBiDictionary<K, V> {
+
+  private BiMap<K, V> biMap;
+
+  /**
+   * Constructor to create a new dictionary, dictionary key will be generated by specified generator
+   * @param generator
+   */
+  public InMemBiDictionary(DictionaryGenerator generator) {
+    super(generator);
+    biMap = HashBiMap.create();
+  }
+
+  /**
+   * Constructor to create a pre-created dictionary
+   * @param preCreatedDictionary
+   */
+  public InMemBiDictionary(Map<K, V> preCreatedDictionary) {
+    super(new DictionaryGenerator<K, V>() {
+      @Override
+      public K generateKey(V value) throws DictionaryGenerationException {
+        // Since dictionary is provided by preCreated, normally it should not come here
+        throw new DictionaryGenerationException(
+            "encounter new dictionary value in pre-created dictionary:" + value);
+      }
+    });
+    biMap = HashBiMap.create(preCreatedDictionary);
+  }
+
+  @Override
+  public K getKey(V value) {
+    return biMap.inverse().get(value);
+  }
+
+  @Override
+  public V getValue(K key) {
+    return biMap.get(key);
+  }
+
+  @Override
+  protected void put(K key, V value) {
+    // dictionary is immutable, it is append only
+    assert (!biMap.containsKey(key));
+    assert (!biMap.containsValue(value));
+    biMap.put(key, value);
+  }
+
+  @Override
+  public int size() {
+    return biMap.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/PreCreatedDictionary.java
new file mode 100644
index 0000000..b7bd9b7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/dictionary/PreCreatedDictionary.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.dictionary;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+
+public class PreCreatedDictionary implements BiDictionary<Integer, Object> {
+
+  private Dictionary dictionary;
+
+  public PreCreatedDictionary(Dictionary dictionary) {
+    this.dictionary = dictionary;
+  }
+
+  @Override
+  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+    Integer key = getKey(value);
+    if (key == null) {
+      throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary");
+    }
+    return key;
+  }
+
+  @Override
+  public Integer getKey(Object value) {
+    return dictionary.getSurrogateKey(value.toString());
+  }
+
+  @Override
+  public String getValue(Integer key) {
+    return dictionary.getDictionaryValueForKey(key);
+  }
+
+  @Override
+  public int size() {
+    return dictionary.getDictionaryChunks().getSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
new file mode 100644
index 0000000..3c0fe53
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/BadRecordFoundException.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.exception;
+
+public class BadRecordFoundException extends CarbonDataLoadingException {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public BadRecordFoundException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public BadRecordFoundException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public BadRecordFoundException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/CarbonDataLoadingException.java
new file mode 100644
index 0000000..9d299a7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/CarbonDataLoadingException.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.exception;
+
+public class CarbonDataLoadingException extends RuntimeException {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataLoadingException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataLoadingException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public CarbonDataLoadingException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
new file mode 100644
index 0000000..3533adb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/exception/NoRetryException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.exception;
+
+public class NoRetryException extends RuntimeException {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public NoRetryException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public NoRetryException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public NoRetryException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonDataLoadSchema.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonDataLoadSchema.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonDataLoadSchema.java
new file mode 100644
index 0000000..d7aa103
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonDataLoadSchema.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+
+/**
+ * Wrapper Data Load Schema object which will be used to
+ * support relation while data loading
+ */
+public class CarbonDataLoadSchema implements Serializable {
+
+  /**
+   * default serializer
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * CarbonTable info
+   */
+  private CarbonTable carbonTable;
+
+  /**
+   * CarbonDataLoadSchema constructor which takes CarbonTable
+   *
+   * @param carbonTable
+   */
+  public CarbonDataLoadSchema(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+  }
+
+  /**
+   * get carbontable
+   *
+   * @return carbonTable
+   */
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
new file mode 100644
index 0000000..39ee270
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+
+public class CarbonLoadModel implements Serializable {
+
+  private static final long serialVersionUID = 6580168429197697465L;
+
+  private String databaseName;
+
+  private String tableName;
+
+  private String factFilePath;
+
+  private String colDictFilePath;
+
+  private String partitionId;
+
+  private CarbonDataLoadSchema carbonDataLoadSchema;
+
+  private boolean aggLoadRequest;
+
+  private String storePath;
+
+  private boolean isRetentionRequest;
+
+  private String csvHeader;
+  private String[] csvHeaderColumns;
+  private String csvDelimiter;
+  private String complexDelimiterLevel1;
+  private String complexDelimiterLevel2;
+
+  private boolean isDirectLoad;
+  private List<LoadMetadataDetails> loadMetadataDetails;
+  private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
+
+  private String blocksID;
+
+  /**
+   * Map from carbon dimension to pre defined dict file path
+   */
+  private HashMap<CarbonDimension, String> predefDictMap;
+
+  /**
+   * task id, each spark task has a unique id
+   */
+  private String taskNo;
+  /**
+   * new load start time
+   */
+  private long factTimeStamp;
+  /**
+   * load Id
+   */
+  private String segmentId;
+
+  private String allDictPath;
+
+  /**
+   * escape Char
+   */
+  private String escapeChar;
+
+  /**
+   * quote Char
+   */
+  private String quoteChar;
+
+  /**
+   * comment Char
+   */
+  private String commentChar;
+
+  private String dateFormat;
+
+  private String defaultTimestampFormat;
+
+  private String defaultDateFormat;
+
+  /**
+   * defines the string that should be treated as null while loadind data
+   */
+  private String serializationNullFormat;
+
+  /**
+   * defines the string to specify whether the bad record logger should be enabled or not
+   */
+  private String badRecordsLoggerEnable;
+
+  /**
+   * defines the option to specify the bad record logger action
+   */
+  private String badRecordsAction;
+
+  /**
+   * Max number of columns that needs to be parsed by univocity parser
+   */
+  private String maxColumns;
+
+  /**
+   * defines the string to specify whether empty data is good or bad
+   */
+  private String isEmptyDataBadRecord;
+
+  /**
+   * Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  /**
+   * Pre fetch data from csv reader
+   */
+  private boolean preFetch;
+
+  /**
+   * Batch sort should be enabled or not
+   */
+  private String sortScope;
+
+  /**
+   * Batch sort size in mb.
+   */
+  private String batchSortSizeInMb;
+  /**
+   * bad record location
+   */
+  private String badRecordsLocation;
+
+  /**
+   * Number of partitions in global sort.
+   */
+  private String globalSortPartitions;
+
+  /**
+   * get escape char
+   *
+   * @return
+   */
+  public String getEscapeChar() {
+    return escapeChar;
+  }
+
+  /**
+   * set escape char
+   *
+   * @param escapeChar
+   */
+  public void setEscapeChar(String escapeChar) {
+    this.escapeChar = escapeChar;
+  }
+
+  public String getCsvDelimiter() {
+    return csvDelimiter;
+  }
+
+  public void setCsvDelimiter(String csvDelimiter) {
+    this.csvDelimiter = csvDelimiter;
+  }
+
+  public String getComplexDelimiterLevel1() {
+    return complexDelimiterLevel1;
+  }
+
+  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
+    this.complexDelimiterLevel1 = complexDelimiterLevel1;
+  }
+
+  public String getComplexDelimiterLevel2() {
+    return complexDelimiterLevel2;
+  }
+
+  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
+    this.complexDelimiterLevel2 = complexDelimiterLevel2;
+  }
+
+  public boolean isDirectLoad() {
+    return isDirectLoad;
+  }
+
+  public void setDirectLoad(boolean isDirectLoad) {
+    this.isDirectLoad = isDirectLoad;
+  }
+
+  public String getAllDictPath() {
+    return allDictPath;
+  }
+
+  public void setAllDictPath(String allDictPath) {
+    this.allDictPath = allDictPath;
+  }
+
+  public String getCsvHeader() {
+    return csvHeader;
+  }
+
+  public void setCsvHeader(String csvHeader) {
+    this.csvHeader = csvHeader;
+  }
+
+  public String[] getCsvHeaderColumns() {
+    return csvHeaderColumns;
+  }
+
+  public void setCsvHeaderColumns(String[] csvHeaderColumns) {
+    this.csvHeaderColumns = csvHeaderColumns;
+  }
+
+  public void initPredefDictMap() {
+    predefDictMap = new HashMap<>();
+  }
+
+  public String getPredefDictFilePath(CarbonDimension dimension) {
+    return predefDictMap.get(dimension);
+  }
+
+  public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
+    this.predefDictMap.put(dimension, predefDictFilePath);
+  }
+
+  /**
+   * @return carbon dataload schema
+   */
+  public CarbonDataLoadSchema getCarbonDataLoadSchema() {
+    return carbonDataLoadSchema;
+  }
+
+  /**
+   * @param carbonDataLoadSchema
+   */
+  public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
+    this.carbonDataLoadSchema = carbonDataLoadSchema;
+  }
+
+  /**
+   * @return the databaseName
+   */
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  /**
+   * @param databaseName the databaseName to set
+   */
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @param tableName the tableName to set
+   */
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  /**
+   * @return the factFilePath
+   */
+  public String getFactFilePath() {
+    return factFilePath;
+  }
+
+  /**
+   * @param factFilePath the factFilePath to set
+   */
+  public void setFactFilePath(String factFilePath) {
+    this.factFilePath = factFilePath;
+  }
+
+  /**
+   * @return external column dictionary file path
+   */
+  public String getColDictFilePath() {
+    return colDictFilePath;
+  }
+
+  /**
+   * set external column dictionary file path
+   *
+   * @param colDictFilePath
+   */
+  public void setColDictFilePath(String colDictFilePath) {
+    this.colDictFilePath = colDictFilePath;
+  }
+
+  /**
+   * get copy with partition
+   *
+   * @param uniqueId
+   * @return
+   */
+  public CarbonLoadModel getCopyWithPartition(String uniqueId) {
+    CarbonLoadModel copy = new CarbonLoadModel();
+    copy.tableName = tableName;
+    copy.factFilePath = factFilePath + '/' + uniqueId;
+    copy.databaseName = databaseName;
+    copy.partitionId = uniqueId;
+    copy.aggLoadRequest = aggLoadRequest;
+    copy.loadMetadataDetails = loadMetadataDetails;
+    copy.isRetentionRequest = isRetentionRequest;
+    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copy.carbonDataLoadSchema = carbonDataLoadSchema;
+    copy.blocksID = blocksID;
+    copy.taskNo = taskNo;
+    copy.factTimeStamp = factTimeStamp;
+    copy.segmentId = segmentId;
+    copy.serializationNullFormat = serializationNullFormat;
+    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copy.badRecordsAction = badRecordsAction;
+    copy.escapeChar = escapeChar;
+    copy.quoteChar = quoteChar;
+    copy.commentChar = commentChar;
+    copy.dateFormat = dateFormat;
+    copy.defaultTimestampFormat = defaultTimestampFormat;
+    copy.maxColumns = maxColumns;
+    copy.storePath = storePath;
+    copy.useOnePass = useOnePass;
+    copy.dictionaryServerHost = dictionaryServerHost;
+    copy.dictionaryServerPort = dictionaryServerPort;
+    copy.preFetch = preFetch;
+    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copy.sortScope = sortScope;
+    copy.batchSortSizeInMb = batchSortSizeInMb;
+    copy.badRecordsLocation = badRecordsLocation;
+    return copy;
+  }
+
+  /**
+   * Get copy with taskNo.
+   * Broadcast value is shared in process, so we need to copy it to make sure the value in each
+   * task independently.
+   *
+   * @return
+   */
+  public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
+    CarbonLoadModel copy = new CarbonLoadModel();
+    copy.tableName = tableName;
+    copy.factFilePath = factFilePath;
+    copy.databaseName = databaseName;
+    copy.partitionId = partitionId;
+    copy.aggLoadRequest = aggLoadRequest;
+    copy.loadMetadataDetails = loadMetadataDetails;
+    copy.isRetentionRequest = isRetentionRequest;
+    copy.csvHeader = csvHeader;
+    copy.csvHeaderColumns = csvHeaderColumns;
+    copy.isDirectLoad = isDirectLoad;
+    copy.csvDelimiter = csvDelimiter;
+    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copy.carbonDataLoadSchema = carbonDataLoadSchema;
+    copy.blocksID = blocksID;
+    copy.taskNo = taskNo;
+    copy.factTimeStamp = factTimeStamp;
+    copy.segmentId = segmentId;
+    copy.serializationNullFormat = serializationNullFormat;
+    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copy.badRecordsAction = badRecordsAction;
+    copy.escapeChar = escapeChar;
+    copy.quoteChar = quoteChar;
+    copy.commentChar = commentChar;
+    copy.dateFormat = dateFormat;
+    copy.defaultTimestampFormat = defaultTimestampFormat;
+    copy.maxColumns = maxColumns;
+    copy.storePath = storePath;
+    copy.useOnePass = useOnePass;
+    copy.dictionaryServerHost = dictionaryServerHost;
+    copy.dictionaryServerPort = dictionaryServerPort;
+    copy.preFetch = preFetch;
+    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copy.sortScope = sortScope;
+    copy.batchSortSizeInMb = batchSortSizeInMb;
+    return copy;
+  }
+
+  /**
+   * get CarbonLoadModel with partition
+   *
+   * @param uniqueId
+   * @param filesForPartition
+   * @param header
+   * @param delimiter
+   * @return
+   */
+  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
+      String header, String delimiter) {
+    CarbonLoadModel copyObj = new CarbonLoadModel();
+    copyObj.tableName = tableName;
+    copyObj.factFilePath = null;
+    copyObj.databaseName = databaseName;
+    copyObj.partitionId = uniqueId;
+    copyObj.aggLoadRequest = aggLoadRequest;
+    copyObj.loadMetadataDetails = loadMetadataDetails;
+    copyObj.isRetentionRequest = isRetentionRequest;
+    copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
+    copyObj.csvHeader = header;
+    copyObj.csvHeaderColumns = csvHeaderColumns;
+    copyObj.isDirectLoad = true;
+    copyObj.csvDelimiter = delimiter;
+    copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copyObj.blocksID = blocksID;
+    copyObj.taskNo = taskNo;
+    copyObj.factTimeStamp = factTimeStamp;
+    copyObj.segmentId = segmentId;
+    copyObj.serializationNullFormat = serializationNullFormat;
+    copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copyObj.badRecordsAction = badRecordsAction;
+    copyObj.escapeChar = escapeChar;
+    copyObj.quoteChar = quoteChar;
+    copyObj.commentChar = commentChar;
+    copyObj.dateFormat = dateFormat;
+    copyObj.defaultTimestampFormat = defaultTimestampFormat;
+    copyObj.maxColumns = maxColumns;
+    copyObj.storePath = storePath;
+    copyObj.useOnePass = useOnePass;
+    copyObj.dictionaryServerHost = dictionaryServerHost;
+    copyObj.dictionaryServerPort = dictionaryServerPort;
+    copyObj.preFetch = preFetch;
+    copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
+    copyObj.sortScope = sortScope;
+    copyObj.batchSortSizeInMb = batchSortSizeInMb;
+    copyObj.badRecordsLocation = badRecordsLocation;
+    return copyObj;
+  }
+
+  /**
+   * @return the partitionId
+   */
+  public String getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * @param partitionId the partitionId to set
+   */
+  public void setPartitionId(String partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * @param storePath The storePath to set.
+   */
+  public void setStorePath(String storePath) {
+    this.storePath = storePath;
+  }
+
+  /**
+   * @return Returns the factStoreLocation.
+   */
+  public String getStorePath() {
+    return storePath;
+  }
+
+  /**
+   * isRetentionRequest
+   *
+   * @return
+   */
+  public boolean isRetentionRequest() {
+    return isRetentionRequest;
+  }
+
+  /**
+   * getLoadMetadataDetails.
+   *
+   * @return
+   */
+  public List<LoadMetadataDetails> getLoadMetadataDetails() {
+    return loadMetadataDetails;
+  }
+
+  /**
+   * setLoadMetadataDetails.
+   *
+   * @param loadMetadataDetails
+   */
+  public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
+    this.loadMetadataDetails = loadMetadataDetails;
+  }
+
+  /**
+   * getSegmentUpdateStatusManager
+   *
+   * @return
+   */
+  public SegmentUpdateStatusManager getSegmentUpdateStatusManager() {
+    return segmentUpdateStatusManager;
+  }
+
+  /**
+   * setSegmentUpdateStatusManager
+   *
+   * @param segmentUpdateStatusManager
+   */
+  public void setSegmentUpdateStatusManager(SegmentUpdateStatusManager segmentUpdateStatusManager) {
+    this.segmentUpdateStatusManager = segmentUpdateStatusManager;
+  }
+
+  /**
+   * @return
+   */
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  /**
+   * @param taskNo
+   */
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  /**
+   * @return
+   */
+  public long getFactTimeStamp() {
+    return factTimeStamp;
+  }
+
+  /**
+   * @param factTimeStamp
+   */
+  public void setFactTimeStamp(long factTimeStamp) {
+    this.factTimeStamp = factTimeStamp;
+  }
+
+  public String[] getDelimiters() {
+    return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
+  }
+
+  /**
+   * @return load Id
+   */
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * @param segmentId
+   */
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  /**
+   * the method returns the value to be treated as null while data load
+   *
+   * @return
+   */
+  public String getSerializationNullFormat() {
+    return serializationNullFormat;
+  }
+
+  /**
+   * the method sets the value to be treated as null while data load
+   *
+   * @param serializationNullFormat
+   */
+  public void setSerializationNullFormat(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
+
+  /**
+   * returns the string to enable bad record logger
+   *
+   * @return
+   */
+  public String getBadRecordsLoggerEnable() {
+    return badRecordsLoggerEnable;
+  }
+
+  /**
+   * method sets the string to specify whether to enable or dissable the badrecord logger.
+   *
+   * @param badRecordsLoggerEnable
+   */
+  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
+    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
+  }
+
+  public String getQuoteChar() {
+    return quoteChar;
+  }
+
+  public void setQuoteChar(String quoteChar) {
+    this.quoteChar = quoteChar;
+  }
+
+  public String getCommentChar() {
+    return commentChar;
+  }
+
+  public void setCommentChar(String commentChar) {
+    this.commentChar = commentChar;
+  }
+
+  public String getDateFormat() {
+    return dateFormat;
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = dateFormat;
+  }
+
+  public String getDefaultTimestampFormat() {
+    return defaultTimestampFormat;
+  }
+
+  public void setDefaultTimestampFormat(String defaultTimestampFormat) {
+    this.defaultTimestampFormat = defaultTimestampFormat;
+  }
+
+  /**
+   * @return
+   */
+  public String getMaxColumns() {
+    return maxColumns;
+  }
+
+  /**
+   * @param maxColumns
+   */
+  public void setMaxColumns(String maxColumns) {
+    this.maxColumns = maxColumns;
+  }
+
+  /**
+   * returns option to specify the bad record logger action
+   *
+   * @return
+   */
+  public String getBadRecordsAction() {
+    return badRecordsAction;
+  }
+
+  /**
+   * set option to specify the bad record logger action
+   *
+   * @param badRecordsAction
+   */
+  public void setBadRecordsAction(String badRecordsAction) {
+    this.badRecordsAction = badRecordsAction;
+  }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
+
+  public boolean isPreFetch() {
+    return preFetch;
+  }
+
+  public void setPreFetch(boolean preFetch) {
+    this.preFetch = preFetch;
+  }
+
+  public String getDefaultDateFormat() {
+    return defaultDateFormat;
+  }
+
+  public void setDefaultDateFormat(String defaultDateFormat) {
+    this.defaultDateFormat = defaultDateFormat;
+  }
+
+  public String getIsEmptyDataBadRecord() {
+    return isEmptyDataBadRecord;
+  }
+
+  public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
+    this.isEmptyDataBadRecord = isEmptyDataBadRecord;
+  }
+
+  public String getSortScope() {
+    return sortScope;
+  }
+
+  public void setSortScope(String sortScope) {
+    this.sortScope = sortScope;
+  }
+
+  public String getBatchSortSizeInMb() {
+    return batchSortSizeInMb;
+  }
+
+  public void setBatchSortSizeInMb(String batchSortSizeInMb) {
+    this.batchSortSizeInMb = batchSortSizeInMb;
+  }
+
+  public String getGlobalSortPartitions() {
+    return globalSortPartitions;
+  }
+
+  public void setGlobalSortPartitions(String globalSortPartitions) {
+    this.globalSortPartitions = globalSortPartitions;
+  }
+
+  public String getBadRecordsLocation() {
+    return badRecordsLocation;
+  }
+
+  public void setBadRecordsLocation(String badRecordsLocation) {
+    this.badRecordsLocation = badRecordsLocation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
new file mode 100644
index 0000000..0ee1d90
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.loading.parser.impl.ArrayParserImpl;
+import org.apache.carbondata.processing.loading.parser.impl.PrimitiveParserImpl;
+import org.apache.carbondata.processing.loading.parser.impl.StructParserImpl;
+
+public final class CarbonParserFactory {
+
+  /**
+   * Create parser for the carbon column.
+   *
+   * @param carbonColumn
+   * @param complexDelimiters
+   * @return
+   */
+  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+      String nullFormat) {
+    return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
+  }
+
+  /**
+   * This method may be called recursively if the carbon column is complex type.
+   *
+   * @param carbonColumn
+   * @param complexDelimiters, these delimiters which are used to separate the complex data types.
+   * @param depth              It is like depth of tree, if column has children then depth is 1,
+   *                           And depth becomes 2 if children has children.
+   *                           This depth is used select the complex
+   *                           delimiters
+   * @return GenericParser
+   */
+  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+      String nullFormat, int depth) {
+    switch (carbonColumn.getDataType()) {
+      case ARRAY:
+        List<CarbonDimension> listOfChildDimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create array parser with complex delimiter
+        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
+        for (CarbonDimension dimension : listOfChildDimensions) {
+          arrayParser
+              .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+        }
+        return arrayParser;
+      case STRUCT:
+        List<CarbonDimension> dimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create struct parser with complex delimiter
+        StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
+        for (CarbonDimension dimension : dimensions) {
+          parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+        }
+        return parser;
+      case MAP:
+        throw new UnsupportedOperationException("Complex type Map is not supported yet");
+      default:
+        return new PrimitiveParserImpl();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/ComplexParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/ComplexParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/ComplexParser.java
new file mode 100644
index 0000000..c20766a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/ComplexParser.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser;
+
+/**
+ * It parses data string as per complex data type.
+ */
+public interface ComplexParser<E> extends GenericParser<E> {
+
+  /**
+   * Children to this parser.
+   * @param parser
+   */
+  void addChildren(GenericParser parser);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/GenericParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/GenericParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/GenericParser.java
new file mode 100644
index 0000000..792c409
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/GenericParser.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser;
+
+/**
+ * Parse the data according to implementation, The implementation classes can be struct, array or
+ * map datatypes.
+ * It remains thread safe as the state of implementation class should not change while
+ * calling @{@link GenericParser#parse(Object)} method
+ */
+public interface GenericParser<E> {
+
+  /**
+   * Parse the data as per the delimiter
+   * @param data
+   * @return
+   */
+  E parse(Object data);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/RowParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/RowParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/RowParser.java
new file mode 100644
index 0000000..9a74a41
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/RowParser.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser;
+
+/**
+ * Parse the complete row at once.
+ */
+public interface RowParser {
+
+  /**
+   * Parse row.
+   * @param row input row to be parsed.
+   * @return parsed row.
+   */
+  Object[] parseRow(Object[] row);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
new file mode 100644
index 0000000..c56691a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.parser.ComplexParser;
+import org.apache.carbondata.processing.loading.parser.GenericParser;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * It parses the string to @{@link ArrayObject} using delimiter.
+ * It is thread safe as the state of class don't change while
+ * calling @{@link GenericParser#parse(Object)} method
+ */
+public class ArrayParserImpl implements ComplexParser<ArrayObject> {
+
+  private Pattern pattern;
+
+  private GenericParser child;
+
+  private String nullFormat;
+
+  public ArrayParserImpl(String delimiter, String nullFormat) {
+    pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+    this.nullFormat = nullFormat;
+  }
+
+  @Override
+  public ArrayObject parse(Object data) {
+    if (data != null) {
+      String value = data.toString();
+      if (!value.isEmpty() && !value.equals(nullFormat)) {
+        String[] split = pattern.split(value, -1);
+        if (ArrayUtils.isNotEmpty(split)) {
+          Object[] array = new Object[split.length];
+          for (int i = 0; i < split.length; i++) {
+            array[i] = child.parse(split[i]);
+          }
+          return new ArrayObject(array);
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void addChildren(GenericParser parser) {
+    child = parser;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/PrimitiveParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/PrimitiveParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/PrimitiveParserImpl.java
new file mode 100644
index 0000000..12172bc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/PrimitiveParserImpl.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import org.apache.carbondata.processing.loading.parser.GenericParser;
+
+public class PrimitiveParserImpl implements GenericParser<Object> {
+
+  @Override
+  public Object parse(Object data) {
+    return data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
new file mode 100644
index 0000000..6f7c398
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.loading.parser.GenericParser;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+
+public class RowParserImpl implements RowParser {
+
+  private GenericParser[] genericParsers;
+
+  private int[] outputMapping;
+
+  private int[] inputMapping;
+
+  private int numberOfColumns;
+
+  public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
+    String[] complexDelimiters =
+        (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    DataField[] input = getInput(configuration);
+    genericParsers = new GenericParser[input.length];
+    for (int i = 0; i < genericParsers.length; i++) {
+      genericParsers[i] =
+          CarbonParserFactory.createParser(input[i].getColumn(), complexDelimiters, nullFormat);
+    }
+    outputMapping = new int[output.length];
+    for (int i = 0; i < input.length; i++) {
+      for (int j = 0; j < output.length; j++) {
+        if (input[i].getColumn().equals(output[j].getColumn())) {
+          outputMapping[i] = j;
+          break;
+        }
+      }
+    }
+  }
+
+  public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
+    DataField[] fields = configuration.getDataFields();
+    String[] header = configuration.getHeader();
+    numberOfColumns = header.length;
+    DataField[] input = new DataField[fields.length];
+    inputMapping = new int[input.length];
+    int k = 0;
+    for (int i = 0; i < fields.length; i++) {
+      for (int j = 0; j < numberOfColumns; j++) {
+        if (header[j].equalsIgnoreCase(fields[i].getColumn().getColName())) {
+          input[k] = fields[i];
+          inputMapping[k] = j;
+          k++;
+          break;
+        }
+      }
+    }
+    return input;
+  }
+
+  @Override
+  public Object[] parseRow(Object[] row) {
+    // If number of columns are less in a row then create new array with same size of header.
+    if (row.length < numberOfColumns) {
+      String[] temp = new String[numberOfColumns];
+      System.arraycopy(row, 0, temp, 0, row.length);
+      row = temp;
+    }
+    Object[] out = new Object[genericParsers.length];
+    for (int i = 0; i < genericParsers.length; i++) {
+      Object obj = row[inputMapping[i]];
+      out[outputMapping[i]] = genericParsers[i].parse(obj);
+    }
+    return out;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
new file mode 100644
index 0000000..26744b0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.parser.ComplexParser;
+import org.apache.carbondata.processing.loading.parser.GenericParser;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * It parses the string to @{@link StructObject} using delimiter.
+ * It is thread safe as the state of class don't change while
+ * calling @{@link GenericParser#parse(Object)} method
+ */
+public class StructParserImpl implements ComplexParser<StructObject> {
+
+  private Pattern pattern;
+
+  private List<GenericParser> children = new ArrayList<>();
+
+  private String nullFormat;
+
+  public StructParserImpl(String delimiter, String nullFormat) {
+    pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+    this.nullFormat = nullFormat;
+  }
+
+  @Override
+  public StructObject parse(Object data) {
+    if (data != null) {
+      String value = data.toString();
+      if (!value.isEmpty() && !value.equals(nullFormat)) {
+        String[] split = pattern.split(value, -1);
+        if (ArrayUtils.isNotEmpty(split)) {
+          Object[] array = new Object[children.size()];
+          for (int i = 0; i < split.length && i < array.length; i++) {
+            array[i] = children.get(i).parse(split[i]);
+          }
+          return new StructObject(array);
+        }
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void addChildren(GenericParser parser) {
+    children.add(parser);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/Partitioner.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/Partitioner.java
new file mode 100644
index 0000000..f71d407
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/Partitioner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition;
+
+/**
+ * Partitions the data as per key
+ */
+public interface Partitioner<Key> {
+
+  int getPartition(Key key);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
new file mode 100644
index 0000000..06bd716
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/HashPartitionerImpl.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+
+/**
+ * Hash partitioner implementation
+ */
+public class HashPartitionerImpl implements Partitioner<Object[]> {
+
+  private int numberOfBuckets;
+
+  private Hash[] hashes;
+
+  public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas,
+      int numberOfBuckets) {
+    this.numberOfBuckets = numberOfBuckets;
+    hashes = new Hash[indexes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      switch (columnSchemas.get(i).getDataType()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          hashes[i] = new IntegralHash(indexes.get(i));
+          break;
+        case DOUBLE:
+        case FLOAT:
+        case DECIMAL:
+          hashes[i] = new DecimalHash(indexes.get(i));
+          break;
+        default:
+          hashes[i] = new StringHash(indexes.get(i));
+      }
+    }
+  }
+
+  @Override public int getPartition(Object[] objects) {
+    int hashCode = 0;
+    for (Hash hash : hashes) {
+      hashCode += hash.getHash(objects);
+    }
+    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
+  }
+
+  private interface Hash {
+    int getHash(Object[] value);
+  }
+
+  private static class IntegralHash implements Hash {
+
+    private int index;
+
+    private IntegralHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class DecimalHash implements Hash {
+
+    private int index;
+
+    private DecimalHash(int index) {
+      this.index = index;
+    }
+
+    public int getHash(Object[] value) {
+      return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0;
+    }
+  }
+
+  private static class StringHash implements Hash {
+
+    private int index;
+
+    private StringHash(int index) {
+      this.index = index;
+    }
+
+    @Override public int getHash(Object[] value) {
+      return value[index] != null ? value[index].hashCode() : 0;
+    }
+  }
+}


Mime
View raw message