asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From buyin...@apache.org
Subject [2/3] asterixdb git commit: Add a dataset rebalance REST API.
Date Sat, 27 May 2017 19:23:53 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm
new file mode 100644
index 0000000..4b9eb7d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/nonexist_dataset/nonexist_dataset.1.adm
@@ -0,0 +1 @@
+{"results":"successful"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.10.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm
new file mode 100644
index 0000000..3c6303a
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.11.adm
@@ -0,0 +1 @@
+{ "DatasetName": "LineItem", "GroupName": "LineItem_2", "rebalanceCount": 2 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
new file mode 100644
index 0000000..4b9eb7d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
@@ -0,0 +1 @@
+{"results":"successful"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
new file mode 100644
index 0000000..4f0990e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
@@ -0,0 +1 @@
+{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm
new file mode 100644
index 0000000..2760db0
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.7.adm
@@ -0,0 +1 @@
+{ "DatasetName": "LineItem", "GroupName": "LineItem_1", "rebalanceCount": 1 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm
new file mode 100644
index 0000000..4b9eb7d
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.8.adm
@@ -0,0 +1 @@
+{"results":"successful"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
new file mode 100644
index 0000000..44c244c
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.9.adm
@@ -0,0 +1 @@
+{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_2/tpch/2/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_3/tpch/2/
 LineItem_idx_LineItem"}]}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index f06a0b7..35b7d4c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -100,6 +100,7 @@ public class ErrorCode {
     public static final int COMPILATION_BAD_QUERY_PARAMETER_VALUE = 1037;
     public static final int COMPILATION_ILLEGAL_STATE = 1038;
     public static final int COMPILATION_TWO_PHASE_LOCKING_VIOLATION = 1039;
+    public static final int DATASET_ID_EXHAUSTED = 1040;
 
     // Feed errors
     public static final int DATAFLOW_ILLEGAL_STATE = 3001;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 3047ef5..60e6060 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -32,6 +32,7 @@ public class Servlets {
     public static final String QUERY_RESULT = "/query/service/result/*";
     public static final String QUERY_SERVICE = "/query/service";
     public static final String CONNECTOR = "/connector";
+    public static final String REBALANCE = "/admin/rebalance";
     public static final String SHUTDOWN = "/admin/shutdown";
     public static final String VERSION = "/admin/version";
     public static final String RUNNING_REQUESTS = "/admin/requests/running/*";

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 1aa1474..2e98abd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -37,7 +37,6 @@ public class StoragePathUtil {
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
-    public static final String ADAPTER_INSTANCE_PREFIX = "adapter_";
 
     private StoragePathUtil() {
     }
@@ -61,16 +60,18 @@ public class StoragePathUtil {
         return storageDirName + File.separator + StoragePathUtil.PARTITION_DIR_PREFIX + partitonId;
     }
 
-    public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
-        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+    public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName,
+            long rebalanceCount) {
+        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName, rebalanceCount));
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
         return dataverseName + File.separator + fullIndexName;
     }
 
-    private static String prepareFullIndexName(String datasetName, String idxName) {
-        return datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+    private static String prepareFullIndexName(String datasetName, String idxName, long rebalanceCount) {
+        return (rebalanceCount == 0 ? "" : rebalanceCount + File.separator) + datasetName + DATASET_INDEX_NAME_SEPARATOR
+                + idxName;
     }
 
     public static int getPartitionNumFromName(String name) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index bba3a43..1f80fad 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -86,6 +86,7 @@
 1037 = Invalid query parameter %1$s -- value has to be greater than or equal to %2$s bytes
 1038 = Illegal state. %1$s
 1039 = Two-phase locking violation -- locks can not be acquired after unlocking
+1040 = Dataset id space is exhausted
 
 # Feed Errors
 3001 = Illegal state.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index 01ade10..c84a5bd 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -39,9 +38,9 @@ import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.IndexUtil;
 
 /**
  * Caches metadata entities such that the MetadataManager does not have to
@@ -161,10 +160,7 @@ public class MetadataCache {
                 // Add the primary index associated with the dataset, if the dataset is an
                 // internal dataset.
                 if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                    InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
-                    Index index = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
-                            dataset.getDatasetName(), IndexType.BTREE, id.getPartitioningKey(),
-                            id.getKeySourceIndicator(), id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+                    Index index = IndexUtil.getPrimaryIndex(dataset);
                     addIndexIfNotExistsInternal(index);
                 }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 272cced..673a5ae 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -335,10 +335,6 @@ public class MetadataBootstrap {
         String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
         FileReference file = ioManager.getFileReference(metadataDeviceId, resourceName);
         index.setFile(file);
-        // this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
-        // a dataset that was not yet created
-        List<IVirtualBufferCache> virtualBufferCaches = appContext.getDatasetLifecycleManager()
-                .getVirtualBufferCaches(index.getDatasetId().getId(), metadataPartition.getIODeviceNum());
         ITypeTraits[] typeTraits = index.getTypeTraits();
         IBinaryComparatorFactory[] cmpFactories = index.getKeyBinaryComparatorFactory();
         int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index ee5c9f3..4103a2c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -19,7 +19,6 @@
 
 package org.apache.asterix.metadata.bootstrap;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,6 +26,7 @@ import java.util.List;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -232,7 +232,9 @@ public class MetadataIndex implements IMetadataIndex {
 
     @Override
     public String getFileNameRelativePath() {
-        return getDataverseName() + File.separator + getIndexedDatasetName() + "_idx_" + getIndexName();
+        // The rebalance count for metadata dataset is always 0.
+        return StoragePathUtil.prepareDataverseIndexName(getDataverseName(), getIndexedDatasetName(), getIndexName(),
+                0);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index b647bb7..49b32c0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -124,6 +124,11 @@ public class MetadataManagerUtil {
         return new DefaultNodeGroupDomain(partitions);
     }
 
+    public static List<String> findNodes(MetadataTransactionContext mdTxnCtx, String nodeGroupName)
+            throws AlgebricksException {
+        return MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroupName).getNodeNames();
+    }
+
     public static Feed findFeed(MetadataTransactionContext mdTxnCtx, String dataverse, String feedName)
             throws AlgebricksException {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 69f4e03..e0cfc28 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -82,7 +82,6 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -302,10 +301,22 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
     }
 
+    public List<String> findNodes(String nodeGroupName) throws AlgebricksException {
+        return MetadataManagerUtil.findNodes(mdTxnCtx, nodeGroupName);
+    }
+
     public IAType findType(String dataverse, String typeName) throws AlgebricksException {
         return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
     }
 
+    public IAType findType(Dataset dataset) throws AlgebricksException {
+        return findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+    }
+
+    public IAType findMetaType(Dataset dataset) throws AlgebricksException {
+        return findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
+    }
+
     public Feed findFeed(String dataverse, String feedName) throws AlgebricksException {
         return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
     }
@@ -381,17 +392,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return new Pair<>(dataScanner, constraint);
     }
 
-    public IDataFormat getDataFormat(String dataverseName) throws CompilationException {
-        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
-        IDataFormat format;
-        try {
-            format = (IDataFormat) Class.forName(dataverse.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new CompilationException(e);
-        }
-        return format;
-    }
-
     public Dataverse findDataverse(String dataverseName) throws CompilationException {
         return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
     }
@@ -760,10 +760,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
         return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
     }
 
-    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
-            String targetIdxName, boolean temp) throws AlgebricksException {
-        return SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, datasetName), mdTxnCtx,
-                targetIdxName, temp);
+    public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
+            throws AlgebricksException {
+        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
@@ -860,80 +859,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
                 fieldPermutation[i++] = idx;
             }
         }
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String itemTypeName = dataset.getItemTypeName();
-            String itemTypeDataverseName = dataset.getItemTypeDataverseName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, itemTypeDataverseName, itemTypeName).getDatatype();
-            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset);
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            boolean hasSecondaries = MetadataManager.INSTANCE
-                    .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
-
-            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
-                    storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
-            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
-                    storaegComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
-            IIndexDataflowHelperFactory idfh =
-                    new IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), splitsAndConstraint.first);
-            LSMPrimaryUpsertOperatorDescriptor op;
-            ITypeTraits[] outputTypeTraits =
-                    new ITypeTraits[recordDesc.getFieldCount() + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-
-            // add the previous record first
-            int f = 0;
-            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            f++;
-            // add the previous meta second
-            if (dataset.hasMetaPart()) {
-                outputSerDes[f] =
-                        FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
-                f++;
-            }
-            // add the previous filter third
-            int fieldIdx = -1;
-            if (numFilterFields > 0) {
-                String filterField = DatasetUtil.getFilterField(dataset).get(0);
-                for (i = 0; i < itemType.getFieldNames().length; i++) {
-                    if (itemType.getFieldNames()[i].equals(filterField)) {
-                        break;
-                    }
-                }
-                fieldIdx = i;
-                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
-                        .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
-                f++;
-            }
-            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
-                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
-                outputSerDes[j + f] = recordDesc.getFields()[j];
-            }
-            RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
-                    context.getMissingWriterFactory(), modificationCallbackFactory, searchCallbackFactory,
-                    dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries);
-            return new Pair<>(op, splitsAndConstraint.second);
-
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
+                context.getMissingWriterFactory());
     }
 
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory, IDataFormat format)
             throws AlgebricksException {
@@ -1635,15 +1565,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds)
             throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), ds.getDatasetName(),
-                ds.getDatasetDetails().isTemp());
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+        return getSplitProviderAndConstraints(ds, ds.getDatasetName());
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds,
             String indexName) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), ds.getDatasetName(), indexName,
-                ds.getDatasetDetails().isTemp());
+        FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 34fa7bb..4b31767 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.IntStream;
 
 import org.apache.asterix.active.ActiveLifecycleListener;
 import org.apache.asterix.active.IActiveEntityEventsListener;
@@ -44,6 +45,9 @@ import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.MetadataManager;
@@ -75,17 +79,23 @@ import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondar
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -133,7 +143,9 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
     private final IDatasetDetails datasetDetails;
     private final String metaTypeDataverseName;
     private final String metaTypeName;
+    private final long rebalanceCount;
     private int pendingOp;
+
     /*
      * Transient (For caching)
      */
@@ -151,6 +163,31 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
             DatasetType datasetType, int datasetId, int pendingOp) {
+        this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, metaItemTypeDataverseName,
+                metaItemTypeName, nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints,
+                datasetType, datasetId, pendingOp, 0L);
+    }
+
+    public Dataset(Dataset dataset) {
+        this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
+                dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName,
+                dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails,
+                dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp, dataset.rebalanceCount);
+    }
+
+    public Dataset(Dataset dataset, boolean forRebalance, String targetNodeGroupName) {
+        this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
+                dataset.metaTypeDataverseName, dataset.metaTypeName, targetNodeGroupName,
+                dataset.compactionPolicyFactory,
+                dataset.compactionPolicyProperties, dataset.datasetDetails, dataset.hints, dataset.datasetType,
+                forRebalance ? DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : dataset.datasetId,
+                dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : dataset.rebalanceCount);
+    }
+
+    public Dataset(String dataverseName, String datasetName, String itemTypeDataverseName, String itemTypeName,
+            String metaItemTypeDataverseName, String metaItemTypeName, String nodeGroupName, String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, IDatasetDetails datasetDetails, Map<String, String> hints,
+            DatasetType datasetType, int datasetId, int pendingOp, long rebalanceCount) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.recordTypeName = itemTypeName;
@@ -165,13 +202,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         this.datasetId = datasetId;
         this.pendingOp = pendingOp;
         this.hints = hints;
-    }
-
-    public Dataset(Dataset dataset) {
-        this(dataset.dataverseName, dataset.datasetName, dataset.recordTypeDataverseName, dataset.recordTypeName,
-                dataset.metaTypeDataverseName, dataset.metaTypeName, dataset.nodeGroupName,
-                dataset.compactionPolicyFactory, dataset.compactionPolicyProperties, dataset.datasetDetails,
-                dataset.hints, dataset.datasetType, dataset.datasetId, dataset.pendingOp);
+        this.rebalanceCount = rebalanceCount;
     }
 
     public String getDataverseName() {
@@ -230,6 +261,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         return metaTypeName;
     }
 
+    public long getRebalanceCount() {
+        return rebalanceCount;
+    }
+
     public boolean hasMetaPart() {
         return metaTypeDataverseName != null && metaTypeName != null;
     }
@@ -376,7 +411,7 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         // Drop the associated nodegroup
         String nodegroup = getNodeGroupName();
         if (!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME)) {
-            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), dataverseName + ":" + datasetName);
+            MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx.getValue(), nodegroup);
         }
     }
 
@@ -591,31 +626,26 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         return Objects.hash(dataverseName, datasetName);
     }
 
-    public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
-            MetadataProvider metadataProvider, int[] datasetPartitions, boolean isSink) {
-        return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions,
-                isSink);
-    }
-
     /**
-     * Get the index dataflow helper factory for the dataset's primary index
+     * Gets the commit runtime factory for inserting/upserting/deleting operations on this dataset.
      *
-     * @param mdProvider
-     *            an instance of metadata provider that is used to fetch metadata information
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param jobId,
+     *            the AsterixDB job id for transaction management.
+     * @param primaryKeyFieldPermutation,
+     *            the primary key field permutation according to the input.
+     * @param isSink,
+     *            whether this commit runtime is the last operator in the pipeline.
+     * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset.
      * @throws AlgebricksException
      */
-    public IResourceFactory getResourceFactory(MetadataProvider mdProvider) throws AlgebricksException {
-        if (getDatasetType() != DatasetType.INTERNAL) {
-            throw new AlgebricksException(ErrorCode.ASTERIX,
-                    ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, getDatasetType());
-        }
-        Index index = mdProvider.getIndex(getDataverseName(), getDatasetName(), getDatasetName());
-        ARecordType recordType = (ARecordType) mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
-        ARecordType metaType = (ARecordType) mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtil.getMergePolicyFactory(this, mdProvider.getMetadataTxnContext());
-        return getResourceFactory(mdProvider, index, recordType, metaType, compactionInfo.first, compactionInfo.second);
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, JobId jobId,
+            int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
+        int[] datasetPartitions = getDatasetPartitions(metadataProvider);
+        return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFieldPermutation,
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), datasetPartitions,
+                isSink);
     }
 
     public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
@@ -659,6 +689,57 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
         return typeTraits;
     }
 
+    /**
+     * Gets the record descriptor for primary records of this dataset.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @return the record descriptor for primary records of this dataset.
+     * @throws AlgebricksException
+     */
+    public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider metadataProvider) throws AlgebricksException {
+        List<List<String>> partitioningKeys = getPrimaryKeys();
+        int numPrimaryKeys = partitioningKeys.size();
+        ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1
+                + (hasMetaPart() ? 1 : 0)];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + (hasMetaPart() ? 1 : 0)];
+        ISerializerDeserializerProvider serdeProvider = metadataProvider.getFormat().getSerdeProvider();
+        List<Integer> indicators = null;
+        if (hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator();
+        }
+        ARecordType itemType = (ARecordType) metadataProvider.findType(this);
+        ARecordType metaType = (ARecordType) metadataProvider.findMetaType(this);
+
+        // Set the serde/traits for primary keys
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType = (indicators == null || indicators.get(i) == 0)
+                    ? itemType.getSubFieldType(partitioningKeys.get(i))
+                    : metaType.getSubFieldType(partitioningKeys.get(i));
+            primaryRecFields[i] = serdeProvider.getSerializerDeserializer(keyType);
+            primaryTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        // Set the serde for the record field
+        primaryRecFields[numPrimaryKeys] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        if (hasMetaPart()) {
+            // Set the serde and traits for the meta record field
+            primaryRecFields[numPrimaryKeys + 1] = SerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(metaType);
+            primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
+        return new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
+
+    /**
+     * Gets the comparator factories for the primary key fields of this dataset.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @return the comparator factories for the primary key fields of this dataset.
+     * @throws AlgebricksException
+     */
     public IBinaryComparatorFactory[] getPrimaryComparatorFactories(MetadataProvider metadataProvider,
             ARecordType recordType, ARecordType metaType) throws AlgebricksException {
         IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
@@ -671,22 +752,53 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
             indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator();
         }
         for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType = (indicators == null || indicators.get(i) == 0)
+                    ? recordType.getSubFieldType(partitioningKeys.get(i))
+                    : metaType.getSubFieldType(partitioningKeys.get(i));
+            cmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
+        }
+        return cmpFactories;
+    }
+
+    /**
+     * Gets the hash function factories for the primary key fields of this dataset.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @return the hash function factories for the primary key fields of this dataset.
+     * @throws AlgebricksException
+     */
+    public IBinaryHashFunctionFactory[] getPrimaryHashFunctionFactories(MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        ARecordType recordType = (ARecordType) metadataProvider.findType(this);
+        ARecordType metaType = (ARecordType) metadataProvider.findMetaType(this);
+        List<List<String>> partitioningKeys = getPrimaryKeys();
+        int numPrimaryKeys = partitioningKeys.size();
+        IBinaryHashFunctionFactory[] hashFuncFactories = new IBinaryHashFunctionFactory[numPrimaryKeys];
+        List<Integer> indicators = null;
+        if (hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) getDatasetDetails()).getKeySourceIndicator();
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
             IAType keyType =
                     (indicators == null || indicators.get(i) == 0) ? recordType.getSubFieldType(partitioningKeys.get(i))
                             : metaType.getSubFieldType(partitioningKeys.get(i));
-            cmpFactories[i] = cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
+            hashFuncFactories[i] = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(keyType);
         }
-        return cmpFactories;
+        return hashFuncFactories;
     }
 
     @Override
     public int[] getPrimaryBloomFilterFields() {
         List<List<String>> partitioningKeys = getPrimaryKeys();
         int numPrimaryKeys = partitioningKeys.size();
-        int[] bloomFilterKeyFields = new int[numPrimaryKeys];
-        for (int i = 0; i < numPrimaryKeys; i++) {
-            bloomFilterKeyFields[i] = i;
-        }
-        return bloomFilterKeyFields;
+        return IntStream.range(0, numPrimaryKeys).toArray();
+    }
+
+    // Gets an array of partition numbers for this dataset.
+    protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException {
+        FileSplit[] splitsForDataset = metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
+                getDatasetName());
+        return IntStream.range(0, splitsForDataset.length).toArray();
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index df47c70..56c3e5f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -41,7 +41,6 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> {
 
     private static final long serialVersionUID = 1L;
     public static final int RECORD_INDICATOR = 0;
-    public static final int META_INDICATOR = 1;
 
     private final String dataverseName;
     // Enforced to be unique within a dataverse.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index c3c5023..b9464ed 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -51,8 +51,10 @@ import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AInt8;
 import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.ARecord;
@@ -73,26 +75,26 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
  */
 public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
     private static final long serialVersionUID = 1L;
-    // Field indexes of serialized Dataset in a tuple.
-    // First key field.
-    public static final int DATASET_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
-    // Second key field.
-    public static final int DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
     // Payload field containing serialized Dataset.
     public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+    private static final String REBALANCE_ID_FIELD_NAME = "rebalanceCount";
 
     @SuppressWarnings("unchecked")
     protected final ISerializerDeserializer<ARecord> recordSerDes =
             SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.DATASET_RECORDTYPE);
     protected final transient AMutableInt32 aInt32;
     protected final transient ISerializerDeserializer<AInt32> aInt32Serde;
+    protected final transient AMutableInt64 aBigInt;
+    protected final transient ISerializerDeserializer<AInt64> aBigIntSerde;
     protected final transient ArrayBackedValueStorage fieldName = new ArrayBackedValueStorage();
 
     @SuppressWarnings("unchecked")
     protected DatasetTupleTranslator(boolean getTuple) {
         super(getTuple, MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
         aInt32 = new AMutableInt32(-1);
+        aBigInt = new AMutableInt64(-1);
         aInt32Serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+        aBigIntSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
     }
 
     @Override
@@ -107,7 +109,6 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
     }
 
     protected Dataset createDatasetFromARecord(ARecord datasetRecord) throws HyracksDataException {
-
         String dataverseName =
                 ((AString) datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX))
                         .getStringValue();
@@ -258,9 +259,14 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
             metaTypeName = ((AString) datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue();
         }
 
+        // Read the rebalance count if there is one.
+        int rebalanceCountIndex = datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME);
+        long rebalanceCount = rebalanceCountIndex >= 0
+                ? ((AInt64) datasetRecord.getValueByPos(rebalanceCountIndex)).getLongValue() : 0;
+
         return new Dataset(dataverseName, datasetName, typeDataverseName, typeName, metaTypeDataverseName, metaTypeName,
                 nodeGroupName, compactionPolicy, compactionPolicyProperties, datasetDetails, hints, datasetType,
-                datasetId, pendingOp);
+                datasetId, pendingOp, rebalanceCount);
     }
 
     @Override
@@ -409,6 +415,16 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
             stringSerde.serialize(aString, fieldValue.getDataOutput());
             recordBuilder.addField(fieldName, fieldValue);
         }
+        if (dataset.getRebalanceCount() > 0) {
+            // Adds the field rebalanceCount.
+            fieldName.reset();
+            aString.setValue("rebalanceCount");
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aBigInt.setValue(dataset.getRebalanceCount());
+            aBigIntSerde.serialize(aBigInt, fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
     }
 
     protected void writeDatasetDetailsRecordType(IARecordBuilder recordBuilder, Dataset dataset, DataOutput dataOutput)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 098645e..6801427 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.metadata.utils;
 
 import java.io.DataOutput;
-import java.io.File;
 import java.rmi.RemoteException;
 import java.util.List;
 import java.util.Map;
@@ -29,11 +28,17 @@ import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -47,25 +52,40 @@ import org.apache.asterix.om.base.AString;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
+import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -217,8 +237,7 @@ public class DatasetUtil {
 
     public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider)
             throws AlgebricksException, HyracksDataException, RemoteException, ACIDException {
-        String datasetPath = dataset.getDataverseName() + File.separator + dataset.getDatasetName();
-        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        LOGGER.info("DROP DATASET: " + dataset);
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         }
@@ -249,41 +268,32 @@ public class DatasetUtil {
         return spec;
     }
 
-    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, String datasetName,
-            MetadataProvider metadataProvider) throws AlgebricksException {
-        String dataverseName = dataverse.getDataverseName();
-        Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
-        if (dataset == null) {
-            throw new AsterixException("Could not find dataset " + datasetName + " in dataverse " + dataverseName);
-        }
-        Index index = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), dataverseName,
-                datasetName, datasetName);
-        ARecordType itemType =
-                (ARecordType) metadataProvider.findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
+    public static JobSpecification createDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        Index index = IndexUtil.getPrimaryIndex(dataset);
+        ARecordType itemType = (ARecordType) metadataProvider.findType(dataset);
         // get meta item type
         ARecordType metaItemType = null;
         if (dataset.hasMetaPart()) {
-            metaItemType = (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
-                    dataset.getMetaItemTypeName());
+            metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
         }
         JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
             sb.append(fs[i] + " ");
         }
         LOGGER.info("CREATING File Splits: " + sb.toString());
-
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtil.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
         //prepare a LocalResourceMetadata which will be stored in NC's local resource repository
         IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
                 compactionInfo.first, compactionInfo.second);
-        IndexBuilderFactory indexBuilderFactory =
-                new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        splitsAndConstraint.first, resourceFactory, !dataset.isTemp());
+        IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(
+                metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first,
+                resourceFactory, !dataset.isTemp());
         IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
         AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
                 splitsAndConstraint.second);
@@ -292,7 +302,7 @@ public class DatasetUtil {
     }
 
     public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
-            MetadataProvider metadataProvider) throws AsterixException, AlgebricksException {
+            MetadataProvider metadataProvider) throws AlgebricksException {
         String dataverseName = dataverse.getDataverseName();
         Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
         if (dataset == null) {
@@ -313,6 +323,180 @@ public class DatasetUtil {
         return spec;
     }
 
+    /**
+     * Creates a primary index scan operator for a given dataset.
+     *
+     * @param spec,
+     *            the job specification.
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param dataset,
+     *            the dataset to scan.
+     * @param jobId,
+     *            the AsterixDB job id for transaction management.
+     * @return a primary index scan operator.
+     * @throws AlgebricksException
+     */
+    public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
+            Dataset dataset, JobId jobId) throws AlgebricksException {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
+        IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
+        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(),
+                        dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
+                        IRecoveryManager.ResourceType.LSM_BTREE);
+        IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
+                metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
+        BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
+                dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
+                indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false);
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
+
+    /**
+     * Creates a primary index upsert operator for a given dataset.
+     *
+     * @param spec,
+     *            the job specification.
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param dataset,
+     *            the dataset to upsert.
+     * @param inputRecordDesc,the
+     *            record descriptor for an input tuple.
+     * @param fieldPermutation,
+     *            the field permutation according to the input.
+     * @param missingWriterFactory,
+     *            the factory for customizing missing value serialization.
+     * @return a primary index scan operator and its location constraints.
+     * @throws AlgebricksException
+     */
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(
+            JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
+        int numKeys = dataset.getPrimaryKeys().size();
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
+        ARecordType itemType = (ARecordType) metadataProvider.findType(dataset);
+        ARecordType metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
+        try {
+            Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                    dataset.getDatasetName());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider
+                    .getSplitProviderAndConstraints(dataset);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (int i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+            boolean hasSecondaries = metadataProvider
+                    .getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
+            IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
+                    storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
+                    storageComponentProvider, primaryIndex, jobId, IndexOperation.UPSERT, primaryKeyFields);
+            IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+            LSMPrimaryUpsertOperatorDescriptor op;
+            ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+            // add the previous record first
+            int f = 0;
+            outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            f++;
+            // add the previous meta second
+            if (dataset.hasMetaPart()) {
+                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                        .getSerializerDeserializer(metaItemType);
+                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                f++;
+            }
+            // add the previous filter third
+            int fieldIdx = -1;
+            if (numFilterFields > 0) {
+                String filterField = DatasetUtil.getFilterField(dataset).get(0);
+                String[] fieldNames = itemType.getFieldNames();
+                int i = 0;
+                for (; i < fieldNames.length; i++) {
+                    if (fieldNames[i].equals(filterField)) {
+                        break;
+                    }
+                }
+                fieldIdx = i;
+                outputTypeTraits[f] = FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+                outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+                        .getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                f++;
+            }
+            for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+                outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+            }
+            RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
+            op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
+                    missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
+                    dataset.getFrameOpCallbackFactory(), numKeys, itemType, fieldIdx, hasSecondaries);
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    /**
+     * Creates a dummy key provider operator for the primary index scan.
+     *
+     * @param spec,
+     *            the job specification.
+     * @param dataset,
+     *            the dataset to scan.
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @return a dummy key provider operator.
+     * @throws AlgebricksException
+     */
+    public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
+            MetadataProvider metadataProvider) throws AlgebricksException {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
+        AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
+
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
+
     public static boolean isFullyQualifiedName(String datasetName) {
         return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name can't start with a .
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 6d07fc7..96ca8d7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -21,18 +21,24 @@ package org.apache.asterix.metadata.utils;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
 
 public class IndexUtil {
@@ -53,6 +59,13 @@ public class IndexUtil {
         return secondaryFilterFields(dataset, index, filterTypeTraits);
     }
 
+    public static Index getPrimaryIndex(Dataset dataset) {
+        InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
+        return new Index(dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
+                DatasetConfig.IndexType.BTREE, id.getPartitioningKey(), id.getKeySourceIndicator(),
+                id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+    }
+
     public static int[] getBtreeFieldsIfFiltered(Dataset dataset, Index index) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
             return DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
@@ -144,4 +157,22 @@ public class IndexUtil {
                         physicalOptimizationConfig, recType, metaType, enforcedType, enforcedMetaType);
         return secondaryIndexHelper.buildCompactJobSpec();
     }
+
+    /**
+     * Binds a job event listener to the job specification.
+     *
+     * @param spec,
+     *            the job specification.
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @return the AsterixDB job id for transaction management.
+     */
+    public static JobId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) {
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        return jobId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 1f7914d..b31bd47 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -51,7 +52,6 @@ import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -128,13 +128,16 @@ public class SecondaryBTreeOperationsHelper extends SecondaryTreeIndexOperations
             return spec;
         } else {
             // Create dummy key provider for feeding the primary index scan.
-            AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
+            IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+                    metadataProvider);
+            JobId jobId = IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
+            IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+                    jobId);
 
             // Assign op.
-            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            IOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
                 sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, sourceOp, 0);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87805197/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 86e3911..b437798 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,19 +19,13 @@
 
 package org.apache.asterix.metadata.utils;
 
-import java.io.DataOutput;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor;
@@ -52,9 +46,6 @@ import org.apache.asterix.runtime.evaluators.functions.AndDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,21 +62,11 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 @SuppressWarnings("rawtypes")
@@ -262,59 +243,13 @@ public abstract class SecondaryIndexOperationsHelper {
         primaryTypeTraits[numPrimaryKeys] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         if (dataset.hasMetaPart()) {
             primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
-            primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+            primaryTypeTraits[numPrimaryKeys + 1] = TypeTraitProvider.INSTANCE.getTypeTrait(metaType);
         }
         primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
     }
 
     protected abstract void setSecondaryRecDescAndComparators() throws AlgebricksException;
 
-    protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
-        // Build dummy tuple containing one field with a dummy value inside.
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        try {
-            // Serialize dummy value into a field.
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-        // Add dummy field.
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
-                primaryPartitionConstraint);
-        return keyProviderOp;
-    }
-
-    protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
-        // -Infinity
-        int[] lowKeyFields = null;
-        // +Infinity
-        int[] highKeyFields = null;
-        ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
-        JobId jobId = JobIdFactory.generateJobId();
-        metadataProvider.setJobId(jobId);
-        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
-        IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(jobId, isWriteTransaction);
-        spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
-                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, dataset.getDatasetId(),
-                        primaryBloomFilterKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
-        IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
-                metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
-        BTreeSearchOperatorDescriptor primarySearchOp =
-                new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, lowKeyFields, highKeyFields, true, true,
-                        indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false);
-        AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
-                primaryPartitionConstraint);
-        return primarySearchOp;
-    }
 
     protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec, int numSecondaryKeyFields,
             RecordDescriptor secondaryRecDesc) throws AlgebricksException {


Mime
View raw message