carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [10/20] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module
Date Sun, 01 Oct 2017 01:43:26 GMT
http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
deleted file mode 100644
index 82605a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-
-/**
- * Dictionary implementation along with dictionary server client to get new dictionary values
- */
-public class DictionaryServerClientDictionary implements BiDictionary<Integer, Object> {
-
-  private Dictionary dictionary;
-
-  private DictionaryClient client;
-
-  private Map<Object, Integer> localCache;
-
-  private DictionaryMessage dictionaryMessage;
-
-  private int base;
-
-  public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
-      DictionaryMessage key, Map<Object, Integer> localCache) {
-    this.dictionary = dictionary;
-    this.client = client;
-    this.dictionaryMessage = key;
-    this.localCache = localCache;
-    this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
-  }
-
-  @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
-    Integer key = getKey(value);
-    if (key == null) {
-      dictionaryMessage.setData(value.toString());
-      DictionaryMessage dictionaryValue = client.getDictionary(dictionaryMessage);
-      key = dictionaryValue.getDictionaryValue();
-      synchronized (localCache) {
-        localCache.put(value, key);
-      }
-      return key + base;
-    }
-    return key;
-  }
-
-  @Override public Integer getKey(Object value) {
-    Integer key = -1;
-    if (dictionary != null) {
-      key = dictionary.getSurrogateKey(value.toString());
-    }
-    if (key == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
-      key = localCache.get(value);
-      if (key != null) {
-        return key + base;
-      }
-    }
-    return key;
-  }
-
-  @Override public Object getValue(Integer key) {
-    throw new UnsupportedOperationException("Not supported here");
-  }
-
-  @Override public int size() {
-    dictionaryMessage.setType(DictionaryMessageType.SIZE);
-    return client.getDictionary(dictionaryMessage).getDictionaryValue() + base;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
deleted file mode 100644
index e6cd42f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-
-/**
- * It is used for generating dictionary from value itself, like timestamp can be used directly as
- * dictionary.
- */
-public class DirectDictionary implements BiDictionary<Integer, Object> {
-
-  private DirectDictionaryGenerator dictionaryGenerator;
-
-  public DirectDictionary(DirectDictionaryGenerator dictionaryGenerator) {
-    this.dictionaryGenerator = dictionaryGenerator;
-  }
-
-  @Override
-  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
-    Integer key = getKey(value);
-    if (key == null) {
-      throw new UnsupportedOperationException("trying to add new entry in DirectDictionary");
-    }
-    return key;
-  }
-
-  @Override
-  public Integer getKey(Object value) {
-    return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
-  }
-
-  @Override
-  public Object getValue(Integer key) {
-    return dictionaryGenerator.getValueFromSurrogate(key);
-  }
-
-  @Override public int size() {
-    return Integer.MAX_VALUE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
deleted file mode 100644
index 7b6d5f1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import java.util.Map;
-
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.devapi.DictionaryGenerator;
-import org.apache.carbondata.core.devapi.GeneratingBiDictionary;
-
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
-
-public class InMemBiDictionary<K, V> extends GeneratingBiDictionary<K, V> {
-
-  private BiMap<K, V> biMap;
-
-  /**
-   * Constructor to create a new dictionary, dictionary key will be generated by specified generator
-   * @param generator
-   */
-  public InMemBiDictionary(DictionaryGenerator generator) {
-    super(generator);
-    biMap = HashBiMap.create();
-  }
-
-  /**
-   * Constructor to create a pre-created dictionary
-   * @param preCreatedDictionary
-   */
-  public InMemBiDictionary(Map<K, V> preCreatedDictionary) {
-    super(new DictionaryGenerator<K, V>() {
-      @Override
-      public K generateKey(V value) throws DictionaryGenerationException {
-        // Since dictionary is provided by preCreated, normally it should not come here
-        throw new DictionaryGenerationException(
-            "encounter new dictionary value in pre-created dictionary:" + value);
-      }
-    });
-    biMap = HashBiMap.create(preCreatedDictionary);
-  }
-
-  @Override
-  public K getKey(V value) {
-    return biMap.inverse().get(value);
-  }
-
-  @Override
-  public V getValue(K key) {
-    return biMap.get(key);
-  }
-
-  @Override
-  protected void put(K key, V value) {
-    // dictionary is immutable, it is append only
-    assert (!biMap.containsKey(key));
-    assert (!biMap.containsValue(value));
-    biMap.put(key, value);
-  }
-
-  @Override
-  public int size() {
-    return biMap.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
deleted file mode 100644
index 19b1cf3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.dictionary;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-
-public class PreCreatedDictionary implements BiDictionary<Integer, Object> {
-
-  private Dictionary dictionary;
-
-  public PreCreatedDictionary(Dictionary dictionary) {
-    this.dictionary = dictionary;
-  }
-
-  @Override
-  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
-    Integer key = getKey(value);
-    if (key == null) {
-      throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary");
-    }
-    return key;
-  }
-
-  @Override
-  public Integer getKey(Object value) {
-    return dictionary.getSurrogateKey(value.toString());
-  }
-
-  @Override
-  public String getValue(Integer key) {
-    return dictionary.getDictionaryValueForKey(key);
-  }
-
-  @Override
-  public int size() {
-    return dictionary.getDictionaryChunks().getSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
deleted file mode 100644
index eb95528..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.exception;
-
-public class BadRecordFoundException extends CarbonDataLoadingException {
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public BadRecordFoundException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public BadRecordFoundException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public BadRecordFoundException(Throwable t) {
-    super(t);
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
deleted file mode 100644
index 6ffdd03..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.exception;
-
-public class CarbonDataLoadingException extends RuntimeException {
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataLoadingException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataLoadingException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public CarbonDataLoadingException(Throwable t) {
-    super(t);
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
deleted file mode 100644
index 027b2d0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/NoRetryException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.exception;
-
-public class NoRetryException extends RuntimeException {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public NoRetryException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public NoRetryException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param t
-   */
-  public NoRetryException(Throwable t) {
-    super(t);
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
deleted file mode 100644
index c37e782..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
-import org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
-import org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
-
-public final class CarbonParserFactory {
-
-  /**
-   * Create parser for the carbon column.
-   *
-   * @param carbonColumn
-   * @param complexDelimiters
-   * @return
-   */
-  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
-      String nullFormat) {
-    return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
-  }
-
-  /**
-   * This method may be called recursively if the carbon column is complex type.
-   *
-   * @param carbonColumn
-   * @param complexDelimiters, these delimiters which are used to separate the complex data types.
-   * @param depth              It is like depth of tree, if column has children then depth is 1,
-   *                           And depth becomes 2 if children has children.
-   *                           This depth is used select the complex
-   *                           delimiters
-   * @return GenericParser
-   */
-  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
-      String nullFormat, int depth) {
-    switch (carbonColumn.getDataType()) {
-      case ARRAY:
-        List<CarbonDimension> listOfChildDimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create array parser with complex delimiter
-        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
-        for (CarbonDimension dimension : listOfChildDimensions) {
-          arrayParser
-              .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
-        }
-        return arrayParser;
-      case STRUCT:
-        List<CarbonDimension> dimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create struct parser with complex delimiter
-        StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
-        for (CarbonDimension dimension : dimensions) {
-          parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
-        }
-        return parser;
-      case MAP:
-        throw new UnsupportedOperationException("Complex type Map is not supported yet");
-      default:
-        return new PrimitiveParserImpl();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
deleted file mode 100644
index 60247a1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/ComplexParser.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * It parses data string as per complex data type.
- */
-public interface ComplexParser<E> extends GenericParser<E> {
-
-  /**
-   * Children to this parser.
-   * @param parser
-   */
-  void addChildren(GenericParser parser);
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
deleted file mode 100644
index b745bed..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * Parse the data according to implementation, The implementation classes can be struct, array or
- * map datatypes.
- * It remains thread safe as the state of implementation class should not change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public interface GenericParser<E> {
-
-  /**
-   * Parse the data as per the delimiter
-   * @param data
-   * @return
-   */
-  E parse(Object data);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
deleted file mode 100644
index 9795e90..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/RowParser.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser;
-
-/**
- * Parse the complete row at once.
- */
-public interface RowParser {
-
-  /**
-   * Parse row.
-   * @param row input row to be parsed.
-   * @return parsed row.
-   */
-  Object[] parseRow(Object[] row);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
deleted file mode 100644
index 11bbc78..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import java.util.regex.Pattern;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
-import org.apache.carbondata.processing.newflow.parser.ComplexParser;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-import org.apache.commons.lang.ArrayUtils;
-
-/**
- * It parses the string to @{@link ArrayObject} using delimiter.
- * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public class ArrayParserImpl implements ComplexParser<ArrayObject> {
-
-  private Pattern pattern;
-
-  private GenericParser child;
-
-  private String nullFormat;
-
-  public ArrayParserImpl(String delimiter, String nullFormat) {
-    pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
-    this.nullFormat = nullFormat;
-  }
-
-  @Override
-  public ArrayObject parse(Object data) {
-    if (data != null) {
-      String value = data.toString();
-      if (!value.isEmpty() && !value.equals(nullFormat)) {
-        String[] split = pattern.split(value, -1);
-        if (ArrayUtils.isNotEmpty(split)) {
-          Object[] array = new Object[split.length];
-          for (int i = 0; i < split.length; i++) {
-            array[i] = child.parse(split[i]);
-          }
-          return new ArrayObject(array);
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void addChildren(GenericParser parser) {
-    child = parser;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
deleted file mode 100644
index 2cf5633..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-public class PrimitiveParserImpl implements GenericParser<Object> {
-
-  @Override
-  public Object parse(Object data) {
-    return data;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
deleted file mode 100644
index 61e4a31..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-import org.apache.carbondata.processing.newflow.parser.RowParser;
-
-public class RowParserImpl implements RowParser {
-
-  private GenericParser[] genericParsers;
-
-  private int[] outputMapping;
-
-  private int[] inputMapping;
-
-  private int numberOfColumns;
-
-  public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
-    String[] complexDelimiters =
-        (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
-    String nullFormat =
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
-            .toString();
-    DataField[] input = getInput(configuration);
-    genericParsers = new GenericParser[input.length];
-    for (int i = 0; i < genericParsers.length; i++) {
-      genericParsers[i] =
-          CarbonParserFactory.createParser(input[i].getColumn(), complexDelimiters, nullFormat);
-    }
-    outputMapping = new int[output.length];
-    for (int i = 0; i < input.length; i++) {
-      for (int j = 0; j < output.length; j++) {
-        if (input[i].getColumn().equals(output[j].getColumn())) {
-          outputMapping[i] = j;
-          break;
-        }
-      }
-    }
-  }
-
-  public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
-    DataField[] fields = configuration.getDataFields();
-    String[] header = configuration.getHeader();
-    numberOfColumns = header.length;
-    DataField[] input = new DataField[fields.length];
-    inputMapping = new int[input.length];
-    int k = 0;
-    for (int i = 0; i < fields.length; i++) {
-      for (int j = 0; j < numberOfColumns; j++) {
-        if (header[j].equalsIgnoreCase(fields[i].getColumn().getColName())) {
-          input[k] = fields[i];
-          inputMapping[k] = j;
-          k++;
-          break;
-        }
-      }
-    }
-    return input;
-  }
-
-  @Override
-  public Object[] parseRow(Object[] row) {
-    // If number of columns are less in a row then create new array with same size of header.
-    if (row.length < numberOfColumns) {
-      String[] temp = new String[numberOfColumns];
-      System.arraycopy(row, 0, temp, 0, row.length);
-      row = temp;
-    }
-    Object[] out = new Object[genericParsers.length];
-    for (int i = 0; i < genericParsers.length; i++) {
-      Object obj = row[inputMapping[i]];
-      out[outputMapping[i]] = genericParsers[i].parse(obj);
-    }
-    return out;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
deleted file mode 100644
index 3969d9a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.parser.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
-import org.apache.carbondata.processing.newflow.parser.ComplexParser;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
-
-import org.apache.commons.lang.ArrayUtils;
-
-/**
- * It parses the string to @{@link StructObject} using delimiter.
- * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(Object)} method
- */
-public class StructParserImpl implements ComplexParser<StructObject> {
-
-  private Pattern pattern;
-
-  private List<GenericParser> children = new ArrayList<>();
-
-  private String nullFormat;
-
-  public StructParserImpl(String delimiter, String nullFormat) {
-    pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
-    this.nullFormat = nullFormat;
-  }
-
-  @Override
-  public StructObject parse(Object data) {
-    if (data != null) {
-      String value = data.toString();
-      if (!value.isEmpty() && !value.equals(nullFormat)) {
-        String[] split = pattern.split(value, -1);
-        if (ArrayUtils.isNotEmpty(split)) {
-          Object[] array = new Object[children.size()];
-          for (int i = 0; i < split.length && i < array.length; i++) {
-            array[i] = children.get(i).parse(split[i]);
-          }
-          return new StructObject(array);
-        }
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public void addChildren(GenericParser parser) {
-    children.add(parser);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
deleted file mode 100644
index 85de593..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/Partitioner.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.partition;
-
-/**
- * Partitions the data as per key
- */
-public interface Partitioner<Key> {
-
-  int getPartition(Key key);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
deleted file mode 100644
index 42f48f0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/partition/impl/HashPartitionerImpl.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.partition.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.processing.newflow.partition.Partitioner;
-
-/**
- * Hash partitioner implementation
- */
-public class HashPartitionerImpl implements Partitioner<Object[]> {
-
-  private int numberOfBuckets;
-
-  private Hash[] hashes;
-
-  public HashPartitionerImpl(List<Integer> indexes, List<ColumnSchema> columnSchemas,
-      int numberOfBuckets) {
-    this.numberOfBuckets = numberOfBuckets;
-    hashes = new Hash[indexes.size()];
-    for (int i = 0; i < indexes.size(); i++) {
-      switch (columnSchemas.get(i).getDataType()) {
-        case SHORT:
-        case INT:
-        case LONG:
-          hashes[i] = new IntegralHash(indexes.get(i));
-          break;
-        case DOUBLE:
-        case FLOAT:
-        case DECIMAL:
-          hashes[i] = new DecimalHash(indexes.get(i));
-          break;
-        default:
-          hashes[i] = new StringHash(indexes.get(i));
-      }
-    }
-  }
-
-  @Override public int getPartition(Object[] objects) {
-    int hashCode = 0;
-    for (Hash hash : hashes) {
-      hashCode += hash.getHash(objects);
-    }
-    return (hashCode & Integer.MAX_VALUE) % numberOfBuckets;
-  }
-
-  private interface Hash {
-    int getHash(Object[] value);
-  }
-
-  private static class IntegralHash implements Hash {
-
-    private int index;
-
-    private IntegralHash(int index) {
-      this.index = index;
-    }
-
-    public int getHash(Object[] value) {
-      return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0;
-    }
-  }
-
-  private static class DecimalHash implements Hash {
-
-    private int index;
-
-    private DecimalHash(int index) {
-      this.index = index;
-    }
-
-    public int getHash(Object[] value) {
-      return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0;
-    }
-  }
-
-  private static class StringHash implements Hash {
-
-    private int index;
-
-    private StringHash(int index) {
-      this.index = index;
-    }
-
-    @Override public int getHash(Object[] value) {
-      return value[index] != null ? value[index].hashCode() : 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
deleted file mode 100644
index 1de55e0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.row;
-
-import java.util.NoSuchElementException;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-
-
-/**
- * Batch of rows.
- */
-public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
-
-  private CarbonRow[] rowBatch;
-
-  private int size = 0;
-
-  private int index = 0;
-
-  public CarbonRowBatch(int batchSize) {
-    this.rowBatch = new CarbonRow[batchSize];
-  }
-
-  public void addRow(CarbonRow carbonRow) {
-    rowBatch[size++] = carbonRow;
-  }
-
-  public int getSize() {
-    return size;
-  }
-
-  @Override public boolean hasNext() {
-    return index < size;
-  }
-
-  @Override
-  public CarbonRow next() throws NoSuchElementException {
-    if (hasNext()) {
-      return rowBatch[index++];
-    }
-    throw new NoSuchElementException("no more elements to iterate");
-  }
-
-  @Override public void remove() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
deleted file mode 100644
index ba96a96..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonSortBatch.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.row;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-
-/**
- * Batch of sorted rows which are ready to be processed by
- */
-public class CarbonSortBatch extends CarbonRowBatch {
-
-  private UnsafeSingleThreadFinalSortFilesMerger iterator;
-
-  public CarbonSortBatch(UnsafeSingleThreadFinalSortFilesMerger iterator) {
-    super(0);
-    this.iterator = iterator;
-  }
-
-  @Override public boolean hasNext() {
-    return iterator.hasNext();
-  }
-
-  @Override public CarbonRow next() {
-    return new CarbonRow(iterator.next());
-  }
-
-  @Override public void close() {
-    iterator.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
deleted file mode 100644
index 5179baa..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.sort.impl.ThreadStatusObserver;
-
-/**
- * The class defines the common methods used in across various type of sort
- */
-public abstract class AbstractMergeSorter implements Sorter {
-  /**
-   * instance of thread status observer
-   */
-  protected ThreadStatusObserver threadStatusObserver;
-
-  /**
-   * Below method will be used to check error in exception
-   */
-  public void checkError() {
-    if (threadStatusObserver.getThrowable() != null) {
-      if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) {
-        throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
-      } else {
-        throw new CarbonDataLoadingException(threadStatusObserver.getThrowable());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
deleted file mode 100644
index 2bf8e16..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonUtil;
-
-/**
- * Sort scope options
- */
-public class SortScopeOptions {
-
-  public static SortScope getSortScope(String sortScope) {
-    if (sortScope == null) {
-      sortScope = CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT;
-    }
-    switch (sortScope.toUpperCase()) {
-      case "BATCH_SORT":
-        return SortScope.BATCH_SORT;
-      case "LOCAL_SORT":
-        return SortScope.LOCAL_SORT;
-      case "GLOBAL_SORT":
-        return SortScope.GLOBAL_SORT;
-      case "NO_SORT":
-        return SortScope.NO_SORT;
-      default:
-        return SortScope.LOCAL_SORT;
-    }
-  }
-
-  public static boolean isValidSortOption(String sortScope) {
-    return CarbonUtil.isValidSortOption(sortScope);
-  }
-
-  public enum SortScope {
-    NO_SORT, BATCH_SORT, LOCAL_SORT, GLOBAL_SORT;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
deleted file mode 100644
index 62434bc..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-public class SortStepRowUtil {
-  public static Object[] convertRow(Object[] data, SortParameters parameters) {
-    int measureCount = parameters.getMeasureColCount();
-    int dimensionCount = parameters.getDimColCount();
-    int complexDimensionCount = parameters.getComplexDimColCount();
-    int noDictionaryCount = parameters.getNoDictionaryCount();
-    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
-    Object[] holder = new Object[3];
-    int index = 0;
-    int nonDicIndex = 0;
-    int allCount = 0;
-    int[] dim = new int[dimensionCount];
-    byte[][] nonDicArray = new byte[noDictionaryCount + complexDimensionCount][];
-    Object[] measures = new Object[measureCount];
-    try {
-      // read dimension values
-      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-        if (isNoDictionaryDimensionColumn[i]) {
-          nonDicArray[nonDicIndex++] = (byte[]) data[i];
-        } else {
-          dim[index++] = (int) data[allCount];
-        }
-        allCount++;
-      }
-
-      for (int i = 0; i < complexDimensionCount; i++) {
-        nonDicArray[nonDicIndex++] = (byte[]) data[allCount];
-        allCount++;
-      }
-
-      index = 0;
-
-      // read measure values
-      for (int i = 0; i < measureCount; i++) {
-        measures[index++] = data[allCount];
-        allCount++;
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
-      // increment number if record read
-    } catch (Exception e) {
-      throw new RuntimeException("Problem while converting row ", e);
-    }
-
-    //return out row
-    return holder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
deleted file mode 100644
index 4a2f5f4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/Sorter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort;
-
-import java.util.Iterator;
-
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * This interface sorts all the data of iterators.
- * The life cycle of this interface is initialize -> sort -> close
- */
-public interface Sorter {
-
-  /**
-   * Initialize sorter with sort parameters.
-   *
-   * @param sortParameters
-   */
-  void initialize(SortParameters sortParameters);
-
-  /**
-   * Sorts the data of all iterators, this iterators can be
-   * read parallely depends on implementation.
-   *
-   * @param iterators array of iterators to read data.
-   * @return
-   * @throws CarbonDataLoadingException
-   */
-  Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException;
-
-  /**
-   * Close resources
-   */
-  void close();
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
deleted file mode 100644
index 39a21ad..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SorterFactory.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeBatchParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl;
-import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterWithBucketingImpl;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class SorterFactory {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(SorterFactory.class.getName());
-
-  public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
-    boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
-            CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
-    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
-    Sorter sorter;
-    if (offheapsort) {
-      if (configuration.getBucketingInfo() != null) {
-        sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields(),
-            configuration.getBucketingInfo());
-      } else {
-        sorter = new UnsafeParallelReadMergeSorterImpl(counter);
-      }
-    } else {
-      if (configuration.getBucketingInfo() != null) {
-        sorter =
-            new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo());
-      } else {
-        sorter = new ParallelReadMergeSorterImpl(counter);
-      }
-    }
-    if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
-      if (configuration.getBucketingInfo() == null) {
-        sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
-      } else {
-        LOGGER.warn(
-            "Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass()
-                .getName());
-      }
-    }
-    return sorter;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
deleted file mode 100644
index 5a8a2c8..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- */
-public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private SortIntermediateFileMerger intermediateFileMerger;
-
-  private SingleThreadFinalSortFilesMerger finalMerger;
-
-  private AtomicLong rowCounter;
-
-  public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
-    this.rowCounter = rowCounter;
-  }
-
-  @Override
-  public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-    intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
-    String[] storeLocations =
-        CarbonDataProcessorUtil.getLocalDataFolderLocation(
-            sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
-            sortParameters.getSegmentId() + "", false, false);
-    // Set the data file location
-    String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
-        File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    finalMerger =
-        new SingleThreadFinalSortFilesMerger(dataFolderLocations, sortParameters.getTableName(),
-            sortParameters.getDimColCount(),
-            sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
-            sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
-            sortParameters.getNoDictionaryDimnesionColumn(),
-            sortParameters.getNoDictionarySortColumn());
-  }
-
-  @Override
-  public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    SortDataRows sortDataRow = new SortDataRows(sortParameters, intermediateFileMerger);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      sortDataRow.initialize();
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(
-            new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter,
-                threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRow, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      intermediateFileMerger.finish();
-      intermediateFileMerger = null;
-      finalMerger.startFinalMerge();
-    } catch (CarbonDataWriterException e) {
-      throw new CarbonDataLoadingException(e);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    // Creates the iterator to read from merge sorter.
-    Iterator<CarbonRowBatch> batchIterator = new CarbonIterator<CarbonRowBatch>() {
-
-      @Override
-      public boolean hasNext() {
-        return finalMerger.hasNext();
-      }
-
-      @Override
-      public CarbonRowBatch next() {
-        int counter = 0;
-        CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-        while (finalMerger.hasNext() && counter < batchSize) {
-          rowBatch.addRow(new CarbonRow(finalMerger.next()));
-          counter++;
-        }
-        return rowBatch;
-      }
-    };
-    return new Iterator[] { batchIterator };
-  }
-
-  @Override public void close() {
-    if (intermediateFileMerger != null) {
-      intermediateFileMerger.close();
-    }
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      // start sorting
-      sortDataRows.startSorting();
-
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(),
-              System.currentTimeMillis());
-      return false;
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private SortDataRows sortDataRows;
-
-    private Object[][] buffer;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver observer;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows,
-        int batchSize, AtomicLong rowCounter, ThreadStatusObserver observer) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.buffer = new Object[batchSize][];
-      this.rowCounter = rowCounter;
-      this.observer = observer;
-
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              buffer[i++] = row.getData();
-            }
-          }
-          if (i > 0) {
-            sortDataRows.addRowBatch(buffer, i);
-            rowCounter.getAndAdd(i);
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        observer.notifyFailed(e);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
deleted file mode 100644
index 7e013e0..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It parallely reads data from array of iterates and do merge sort.
- * First it sorts the data and write to temp files. These temp files will be merge sorted to get
- * final merge sort result.
- * This step is specifically for bucketing, it sorts each bucket data separately and write to
- * temp files.
- */
-public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(ParallelReadMergeSorterWithBucketingImpl.class.getName());
-
-  private SortParameters sortParameters;
-
-  private SortIntermediateFileMerger[] intermediateFileMergers;
-
-  private BucketingInfo bucketingInfo;
-
-  private int sortBufferSize;
-
-  private AtomicLong rowCounter;
-
-  public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
-      BucketingInfo bucketingInfo) {
-    this.rowCounter = rowCounter;
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  @Override public void initialize(SortParameters sortParameters) {
-    this.sortParameters = sortParameters;
-    int buffer = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL));
-    sortBufferSize = buffer / bucketingInfo.getNumberOfBuckets();
-    if (sortBufferSize < 100) {
-      sortBufferSize = 100;
-    }
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators)
-      throws CarbonDataLoadingException {
-    SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()];
-    intermediateFileMergers =
-        new SortIntermediateFileMerger[sortDataRows.length];
-    try {
-      for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-        SortParameters parameters = sortParameters.getCopy();
-        parameters.setPartitionID(i + "");
-        setTempLocation(parameters);
-        parameters.setBufferSize(sortBufferSize);
-        intermediateFileMergers[i] = new SortIntermediateFileMerger(parameters);
-        sortDataRows[i] = new SortDataRows(parameters, intermediateFileMergers[i]);
-        sortDataRows[i].initialize();
-      }
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);
-    this.threadStatusObserver = new ThreadStatusObserver(executorService);
-    final int batchSize = CarbonProperties.getInstance().getBatchSize();
-    try {
-      for (int i = 0; i < iterators.length; i++) {
-        executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter,
-            this.threadStatusObserver));
-      }
-      executorService.shutdown();
-      executorService.awaitTermination(2, TimeUnit.DAYS);
-      processRowToNextStep(sortDataRows, sortParameters);
-    } catch (Exception e) {
-      checkError();
-      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
-    }
-    checkError();
-    try {
-      for (int i = 0; i < intermediateFileMergers.length; i++) {
-        intermediateFileMergers[i].finish();
-      }
-    } catch (CarbonDataWriterException e) {
-      throw new CarbonDataLoadingException(e);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-
-    Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
-    for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) {
-      batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize);
-    }
-
-    return batchIterator;
-  }
-
-  private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
-    // Set the data file location
-    String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
-            sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
-            sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
-            this.sortParameters.getNoDictionarySortColumn());
-  }
-
-  @Override public void close() {
-    for (int i = 0; i < intermediateFileMergers.length; i++) {
-      intermediateFileMergers[i].close();
-    }
-  }
-
-  /**
-   * Below method will be used to process data to next step
-   */
-  private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters)
-      throws CarbonDataLoadingException {
-    if (null == sortDataRows || sortDataRows.length == 0) {
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      LOGGER.info("Number of Records was Zero");
-      String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0;
-      LOGGER.info(logMessage);
-      return false;
-    }
-
-    try {
-      for (int i = 0; i < sortDataRows.length; i++) {
-        // start sorting
-        sortDataRows[i].startSorting();
-      }
-      // check any more rows are present
-      LOGGER.info("Record Processed For table: " + parameters.getTableName());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis());
-      return false;
-    } catch (CarbonSortKeyAndGroupByException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(),
-            parameters.getPartitionID(), parameters.getSegmentId(), false, false);
-    String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
-        CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
-    parameters.setTempFileLocation(tmpLocs);
-  }
-
-  /**
-   * This thread iterates the iterator and adds the rows to @{@link SortDataRows}
-   */
-  private static class SortIteratorThread implements Runnable {
-
-    private Iterator<CarbonRowBatch> iterator;
-
-    private SortDataRows[] sortDataRows;
-
-    private AtomicLong rowCounter;
-
-    private ThreadStatusObserver threadStatusObserver;
-
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows,
-        AtomicLong rowCounter, ThreadStatusObserver observer) {
-      this.iterator = iterator;
-      this.sortDataRows = sortDataRows;
-      this.rowCounter = rowCounter;
-      this.threadStatusObserver = observer;
-    }
-
-    @Override
-    public void run() {
-      try {
-        while (iterator.hasNext()) {
-          CarbonRowBatch batch = iterator.next();
-          int i = 0;
-          while (batch.hasNext()) {
-            CarbonRow row = batch.next();
-            if (row != null) {
-              SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
-              synchronized (sortDataRow) {
-                sortDataRow.addRow(row.getData());
-                rowCounter.getAndAdd(1);
-              }
-            }
-          }
-        }
-      } catch (Exception e) {
-        LOGGER.error(e);
-        this.threadStatusObserver.notifyFailed(e);
-      }
-    }
-
-  }
-
-  private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private String partitionId;
-
-    private int batchSize;
-
-    private boolean firstRow = true;
-
-    public MergedDataIterator(String partitionId, int batchSize) {
-      this.partitionId = partitionId;
-      this.batchSize = batchSize;
-    }
-
-    private SingleThreadFinalSortFilesMerger finalMerger;
-
-    @Override public boolean hasNext() {
-      if (firstRow) {
-        firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
-        finalMerger.startFinalMerge();
-      }
-      return finalMerger.hasNext();
-    }
-
-    @Override public CarbonRowBatch next() {
-      int counter = 0;
-      CarbonRowBatch rowBatch = new CarbonRowBatch(batchSize);
-      while (finalMerger.hasNext() && counter < batchSize) {
-        rowBatch.addRow(new CarbonRow(finalMerger.next()));
-        counter++;
-      }
-      return rowBatch;
-    }
-  }
-}


Mime
View raw message