asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [4/9] asterixdb git commit: [ASTERIXDB-1992][ING] Suspend/Resume active entities
Date Thu, 27 Jul 2017 06:35:58 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
deleted file mode 100644
index 3216bfe..0000000
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.management;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent;
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.ActiveLifecycleListener;
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ActivityState;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
-import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.common.messaging.api.INcAddressedMessage;
-import org.apache.asterix.common.metadata.IDataset;
-import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobStatus;
-
-public class ActiveEntityEventsListener implements IActiveEntityEventsListener {
-
-    private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName());
-
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
-    }
-
-    // members
-    protected volatile ActivityState state;
-    protected JobId jobId;
-    protected final List<IActiveEventSubscriber> subscribers = new ArrayList<>();
-    protected final ICcApplicationContext appCtx;
-    protected final EntityId entityId;
-    protected final List<IDataset> datasets;
-    protected final ActiveEvent statsUpdatedEvent;
-    protected long statsTimestamp;
-    protected String stats;
-    protected RequestState statsRequestState;
-    protected final String runtimeName;
-    protected final AlgebricksAbsolutePartitionConstraint locations;
-    protected int numRegistered;
-
-    public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId entityId, List<IDataset> datasets,
-            AlgebricksAbsolutePartitionConstraint locations, String runtimeName) {
-        this.appCtx = appCtx;
-        this.entityId = entityId;
-        this.datasets = datasets;
-        this.state = ActivityState.STOPPED;
-        this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
-        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId);
-        this.stats = "{\"Stats\":\"N/A\"}";
-        this.runtimeName = runtimeName;
-        this.locations = locations;
-        this.numRegistered = 0;
-    }
-
-    @Override
-    public synchronized void notify(ActiveEvent event) {
-        try {
-            LOGGER.finer("EventListener is notified.");
-            ActiveEvent.Kind eventKind = event.getEventKind();
-            switch (eventKind) {
-                case JOB_CREATED:
-                    state = ActivityState.CREATED;
-                    break;
-                case JOB_STARTED:
-                    start(event);
-                    break;
-                case JOB_FINISHED:
-                    finish();
-                    break;
-                case PARTITION_EVENT:
-                    handle((ActivePartitionMessage) event.getEventObject());
-                    break;
-                default:
-                    LOGGER.log(Level.WARNING, "Unhandled feed event notification: " + event);
-                    break;
-            }
-            notifySubscribers(event);
-        } catch (Exception e) {
-            LOGGER.log(Level.SEVERE, "Unhandled Exception", e);
-        }
-    }
-
-    protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
-            numRegistered++;
-            if (numRegistered == locations.getLocations().length) {
-                state = ActivityState.STARTED;
-            }
-        }
-    }
-
-    private void finish() throws Exception {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        JobStatus status = hcc.getJobStatus(jobId);
-        state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : ActivityState.STOPPED;
-        ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        activeLcListener.getNotificationHandler().removeListener(this);
-    }
-
-    private void start(ActiveEvent event) {
-        this.jobId = event.getJobId();
-        state = ActivityState.STARTING;
-    }
-
-    @Override
-    public synchronized void subscribe(IActiveEventSubscriber subscriber) throws HyracksDataException {
-        if (this.state == ActivityState.FAILED) {
-            throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
-        }
-        subscriber.subscribed(this);
-        if (!subscriber.isDone()) {
-            subscribers.add(subscriber);
-        }
-    }
-
-    @Override
-    public EntityId getEntityId() {
-        return entityId;
-    }
-
-    @Override
-    public ActivityState getState() {
-        return state;
-    }
-
-    @Override
-    public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
-    }
-
-    public JobId getJobId() {
-        return jobId;
-    }
-
-    @Override
-    public String getStats() {
-        return stats;
-    }
-
-    @Override
-    public long getStatsTimeStamp() {
-        return statsTimestamp;
-    }
-
-    public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void refreshStats(long timeout) throws HyracksDataException {
-        LOGGER.log(Level.INFO, "refreshStats called");
-        synchronized (this) {
-            if (state != ActivityState.STARTED || statsRequestState == RequestState.STARTED) {
-                LOGGER.log(Level.INFO, "returning immediately since state = " + state + " and statsRequestState = "
-                        + statsRequestState);
-                return;
-            } else {
-                statsRequestState = RequestState.STARTED;
-            }
-        }
-        ICCMessageBroker messageBroker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
-        long reqId = messageBroker.newRequestId();
-        List<INcAddressedMessage> requests = new ArrayList<>();
-        List<String> ncs = Arrays.asList(locations.getLocations());
-        for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
-                    new ActiveRuntimeId(entityId, runtimeName, i), reqId));
-        }
-        try {
-            List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
-            stats = formatStats(responses);
-            statsTimestamp = System.currentTimeMillis();
-            notifySubscribers(statsUpdatedEvent);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-        // Same as above
-        statsRequestState = RequestState.FINISHED;
-    }
-
-    protected synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEventSubscriber subscriber = it.next();
-            if (subscriber.isDone()) {
-                it.remove();
-            } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARNING, "Failed to notify subscriber", e);
-                }
-                if (subscriber.isDone()) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getLocations() {
-        return locations;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index e6ac265..d102d0c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -77,10 +77,14 @@ public class AdapterExecutor implements Runnable {
                 // Adapter has completed execution
                 continueIngestion = false;
             } catch (InterruptedException e) {
+                adapter.fail();
                 throw e;
             } catch (Exception e) {
                 LOGGER.error("Exception during feed ingestion ", e);
                 continueIngestion = adapter.handleException(e);
+                if (!continueIngestion) {
+                    adapter.fail();
+                }
                 failedIngestion = !continueIngestion;
                 restartCount++;
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 822d725..e21f9eb 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -18,39 +18,48 @@
  */
 package org.apache.asterix.external.feed.watch;
 
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public abstract class AbstractSubscriber implements IActiveEventSubscriber {
+public abstract class AbstractSubscriber implements IActiveEntityEventSubscriber {
 
     protected final IActiveEntityEventsListener listener;
-    private boolean done = false;
+    private volatile boolean done = false;
+    private volatile Exception failure = null;
 
     public AbstractSubscriber(IActiveEntityEventsListener listener) {
         this.listener = listener;
     }
 
     @Override
-    public synchronized boolean isDone() {
+    public boolean isDone() {
         return done;
     }
 
-    public synchronized void complete() throws HyracksDataException {
-        done = true;
-        notifyAll();
+    public void complete(Exception failure) {
+        synchronized (listener) {
+            if (failure != null) {
+                this.failure = failure;
+            }
+            done = true;
+            listener.notifyAll();
+        }
     }
 
     @Override
-    public synchronized void sync() throws InterruptedException {
-        while (!done) {
-            wait();
+    public void sync() throws HyracksDataException, InterruptedException {
+        synchronized (listener) {
+            while (!done) {
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
+                }
+                listener.wait();
+            }
         }
     }
 
-    @Override
-    public synchronized void unsubscribe() {
-        done = true;
-        notifyAll();
+    public Exception getFailure() {
+        return failure;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 42f7a74..8230b48 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -19,14 +19,14 @@
 package org.apache.asterix.external.feed.watch;
 
 import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * An event subscriber that does not listen to any events
  */
-public class NoOpSubscriber implements IActiveEventSubscriber {
+public class NoOpSubscriber implements IActiveEntityEventSubscriber {
 
     public static final NoOpSubscriber INSTANCE = new NoOpSubscriber();
 
@@ -49,11 +49,6 @@ public class NoOpSubscriber implements IActiveEventSubscriber {
     }
 
     @Override
-    public void unsubscribe() {
-        // no op
-    }
-
-    @Override
     public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
         // no op
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
index fa2fa7f..a571904 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -32,7 +32,17 @@ public class StatsSubscriber extends AbstractSubscriber {
     @Override
     public void notify(ActiveEvent event) throws HyracksDataException {
         if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
-            complete();
+            try {
+                complete(null);
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } else if (event.getEventKind() == ActiveEvent.Kind.FAILURE) {
+            try {
+                complete((Exception) event.getEventObject());
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 7bab421..a1cdfb0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -18,38 +18,46 @@
  */
 package org.apache.asterix.external.feed.watch;
 
+import java.util.Set;
+
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class WaitForStateSubscriber extends AbstractSubscriber {
 
-    private final ActivityState targetState;
+    private final Set<ActivityState> targetStates;
 
-    public WaitForStateSubscriber(IActiveEntityEventsListener listener, ActivityState targetState)
+    public WaitForStateSubscriber(IActiveEntityEventsListener listener, Set<ActivityState> targetStates)
             throws HyracksDataException {
         super(listener);
-        this.targetState = targetState;
+        this.targetStates = targetStates;
         listener.subscribe(this);
     }
 
     @Override
     public void notify(ActiveEvent event) throws HyracksDataException {
-        if (listener.getState() == targetState) {
-            complete();
+        if (targetStates.contains(listener.getState())) {
+            if (listener.getState() == ActivityState.PERMANENTLY_FAILED
+                    || listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                complete(listener.getJobFailure());
+            } else {
+                complete(null);
+            }
+        } else if (event != null && event.getEventKind() == ActiveEvent.Kind.FAILURE) {
+            try {
+                complete((Exception) event.getEventObject());
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
         }
     }
 
     @Override
     public void subscribed(IActiveEntityEventsListener eventsListener) throws HyracksDataException {
-        if (eventsListener.getState() == ActivityState.FAILED) {
-            throw new RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
-        }
-        if (listener.getState() == targetState) {
-            complete();
+        if (targetStates.contains(listener.getState())) {
+            complete(null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
index 1d8ae5e..f0539c6 100644
--- a/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
+++ b/asterixdb/asterix-lang-aql/src/main/java/org/apache/asterix/lang/aql/statement/SubscribeFeedStatement.java
@@ -25,6 +25,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.watch.FeedActivityDetails;
@@ -37,7 +38,6 @@ import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Feed;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
index 3289d68..3d8fc68 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ConnectFeedStatement.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.lang.common.statement;
 
+import java.util.List;
+
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Statement;
@@ -26,9 +28,6 @@ import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class ConnectFeedStatement implements Statement {
 
     private final Identifier dataverseName;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index c84a5bd..08ca03b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -57,8 +57,7 @@ public class MetadataCache {
     // Key is dataverse name. Key of value map is dataset name.
     protected final Map<String, Map<String, Dataset>> datasets = new HashMap<>();
     // Key is dataverse name. Key of value map is dataset name. Key of value map of value map is index name.
-    protected final Map<String, Map<String, Map<String, Index>>> indexes =
-            new HashMap<>();
+    protected final Map<String, Map<String, Map<String, Index>>> indexes = new HashMap<>();
     // Key is dataverse name. Key of value map is datatype name.
     protected final Map<String, Map<String, Datatype>> datatypes = new HashMap<>();
     // Key is dataverse name.
@@ -66,19 +65,16 @@ public class MetadataCache {
     // Key is function Identifier . Key of value map is function name.
     protected final Map<FunctionSignature, Function> functions = new HashMap<>();
     // Key is adapter dataverse. Key of value map is the adapter name
-    protected final Map<String, Map<String, DatasourceAdapter>> adapters =
-            new HashMap<>();
+    protected final Map<String, Map<String, DatasourceAdapter>> adapters = new HashMap<>();
 
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies =
-            new HashMap<>();
+    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = new HashMap<>();
     // Key is library dataverse. Key of value map is the library name
     protected final Map<String, Map<String, Library>> libraries = new HashMap<>();
     // Key is library dataverse. Key of value map is the feed name
     protected final Map<String, Map<String, Feed>> feeds = new HashMap<>();
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies =
-            new HashMap<>();
+    protected final Map<String, Map<String, CompactionPolicy>> compactionPolicies = new HashMap<>();
     // Key is DataverseName, Key of value map is feedConnectionId
     protected final Map<String, Map<String, FeedConnection>> feedConnections = new HashMap<>();
 
@@ -247,8 +243,7 @@ public class MetadataCache {
                                             datatypes.remove(dataverse.getDataverseName());
                                             adapters.remove(dataverse.getDataverseName());
                                             compactionPolicies.remove(dataverse.getDataverseName());
-                                            List<FunctionSignature> markedFunctionsForRemoval =
-                                                    new ArrayList<>();
+                                            List<FunctionSignature> markedFunctionsForRemoval = new ArrayList<>();
                                             for (FunctionSignature signature : functions.keySet()) {
                                                 if (signature.getNamespace().equals(dataverse.getDataverseName())) {
                                                     markedFunctionsForRemoval.add(signature);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java
deleted file mode 100644
index 61c21f4..0000000
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.metadata;
-
-import org.apache.asterix.common.exceptions.CompilationException;
-
-import java.io.Serializable;
-
-public class MetadataException extends CompilationException {
-    private static final long serialVersionUID = 1L;
-
-    public MetadataException(String message) {
-        super(message);
-    }
-
-    public MetadataException(Throwable cause) {
-        super(cause);
-    }
-
-    public MetadataException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MetadataException(int errorCode, Serializable... params) {
-        super(errorCode, params);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 11645e8..5e6d11a 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -28,6 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.MetadataException;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
@@ -149,7 +152,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addDataverse(ctx.getJobId(), dataverse);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.addDataverse(dataverse);
     }
@@ -159,7 +162,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropDataverse(ctx.getJobId(), dataverseName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropDataverse(dataverseName);
     }
@@ -169,7 +172,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             return metadataNode.getDataverses(ctx.getJobId());
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -196,7 +199,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             dataverse = metadataNode.getDataverse(ctx.getJobId(), dataverseName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the dataverse from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -227,7 +230,7 @@ public class MetadataManager implements IMetadataManager {
             // metadata node.
             dataverseDatasets.addAll(metadataNode.getDataverseDatasets(ctx.getJobId(), dataverseName));
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // Don't update the cache to avoid checking against the transaction's
         // uncommitted datasets.
@@ -241,7 +244,7 @@ public class MetadataManager implements IMetadataManager {
             try {
                 metadataNode.addDataset(ctx.getJobId(), dataset);
             } catch (RemoteException e) {
-                throw new MetadataException(e);
+                throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
         }
 
@@ -258,7 +261,7 @@ public class MetadataManager implements IMetadataManager {
             try {
                 metadataNode.dropDataset(ctx.getJobId(), dataverseName, datasetName);
             } catch (RemoteException e) {
-                throw new MetadataException(e);
+                throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
         }
 
@@ -292,7 +295,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             dataset = metadataNode.getDataset(ctx.getJobId(), dataverseName, datasetName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the dataset from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -318,7 +321,7 @@ public class MetadataManager implements IMetadataManager {
                 // for persistent datasets
                 datasetIndexes = metadataNode.getDatasetIndexes(ctx.getJobId(), dataverseName, datasetName);
             } catch (RemoteException e) {
-                throw new MetadataException(e);
+                throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
         }
         return datasetIndexes;
@@ -330,7 +333,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addCompactionPolicy(mdTxnCtx.getJobId(), compactionPolicy);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.addCompactionPolicy(compactionPolicy);
     }
@@ -343,7 +346,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             compactionPolicy = metadataNode.getCompactionPolicy(ctx.getJobId(), dataverse, policyName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return compactionPolicy;
     }
@@ -353,13 +356,13 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addDatatype(ctx.getJobId(), datatype);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         try {
             ctx.addDatatype(
                     metadataNode.getDatatype(ctx.getJobId(), datatype.getDataverseName(), datatype.getDatatypeName()));
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -369,7 +372,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropDatatype(ctx.getJobId(), dataverseName, datatypeName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropDataDatatype(dataverseName, datatypeName);
     }
@@ -406,7 +409,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             datatype = metadataNode.getDatatype(ctx.getJobId(), dataverseName, datatypeName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the datatype from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -425,7 +428,7 @@ public class MetadataManager implements IMetadataManager {
             try {
                 metadataNode.addIndex(ctx.getJobId(), index);
             } catch (RemoteException e) {
-                throw new MetadataException(e);
+                throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
         }
         ctx.addIndex(index);
@@ -436,7 +439,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addAdapter(mdTxnCtx.getJobId(), adapter);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.addAdapter(adapter);
 
@@ -452,7 +455,7 @@ public class MetadataManager implements IMetadataManager {
             try {
                 metadataNode.dropIndex(ctx.getJobId(), dataverseName, datasetName, indexName);
             } catch (RemoteException e) {
-                throw new MetadataException(e);
+                throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
         }
         ctx.dropIndex(dataverseName, datasetName, indexName);
@@ -485,7 +488,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             index = metadataNode.getIndex(ctx.getJobId(), dataverseName, datasetName, indexName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the index from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -500,7 +503,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addNode(ctx.getJobId(), node);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -509,7 +512,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addNodeGroup(ctx.getJobId(), nodeGroup);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.addNodeGroup(nodeGroup);
     }
@@ -521,7 +524,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             dropped = metadataNode.dropNodegroup(ctx.getJobId(), nodeGroupName, failSilently);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         if (dropped) {
             ctx.dropNodeGroup(nodeGroupName);
@@ -551,7 +554,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             nodeGroup = metadataNode.getNodeGroup(ctx.getJobId(), nodeGroupName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the nodeGroup from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -566,7 +569,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.updateFunction(mdTxnCtx.getJobId(), function);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.dropFunction(function);
         mdTxnCtx.addFunction(function);
@@ -577,7 +580,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addFunction(mdTxnCtx.getJobId(), function);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.addFunction(function);
     }
@@ -588,7 +591,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropFunction(ctx.getJobId(), functionSignature);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropFunction(functionSignature);
     }
@@ -622,7 +625,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             function = metadataNode.getFunction(ctx.getJobId(), functionSignature);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // We fetched the function from the MetadataNode. Add it to the cache
         // when this transaction commits.
@@ -639,7 +642,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addFeedPolicy(mdTxnCtx.getJobId(), feedPolicy);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.addFeedPolicy(feedPolicy);
     }
@@ -649,7 +652,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.initializeDatasetIdFactory(ctx.getJobId());
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -658,7 +661,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             return metadataNode.getMostRecentDatasetId();
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -671,7 +674,7 @@ public class MetadataManager implements IMetadataManager {
             // metadata node.
             dataverseFunctions = metadataNode.getDataverseFunctions(ctx.getJobId(), dataverseName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // Don't update the cache to avoid checking against the transaction's
         // uncommitted functions.
@@ -684,7 +687,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropAdapter(ctx.getJobId(), dataverseName, name);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -695,7 +698,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             adapter = metadataNode.getAdapter(ctx.getJobId(), dataverseName, name);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return adapter;
     }
@@ -706,7 +709,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropLibrary(ctx.getJobId(), dataverseName, libraryName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropLibrary(dataverseName, libraryName);
     }
@@ -720,7 +723,7 @@ public class MetadataManager implements IMetadataManager {
             // metadata node.
             dataverseLibaries = metadataNode.getDataverseLibraries(ctx.getJobId(), dataverseName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // Don't update the cache to avoid checking against the transaction's
         // uncommitted functions.
@@ -732,7 +735,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addLibrary(ctx.getJobId(), library);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.addLibrary(library);
     }
@@ -744,7 +747,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             library = metadataNode.getLibrary(ctx.getJobId(), dataverseName, libraryName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return library;
     }
@@ -777,7 +780,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             feedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return feedPolicy;
     }
@@ -788,7 +791,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return feed;
     }
@@ -806,7 +809,7 @@ public class MetadataManager implements IMetadataManager {
                 ctx.dropFeedConnection(dataverse, feedName, feedConnection.getDatasetName());
             }
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropFeed(feed);
     }
@@ -816,7 +819,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addFeed(ctx.getJobId(), feed);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.addFeed(feed);
     }
@@ -827,7 +830,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addFeedConnection(ctx.getJobId(), feedConnection);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.addFeedConnection(feedConnection);
     }
@@ -838,7 +841,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         ctx.dropFeedConnection(dataverseName, feedName, datasetName);
     }
@@ -849,7 +852,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             return metadataNode.getFeedConnection(ctx.getJobId(), dataverseName, feedName, datasetName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -859,7 +862,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             return metadataNode.getFeedConnections(ctx.getJobId(), dataverseName, feedName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -870,7 +873,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return dataverseAdapters;
     }
@@ -883,7 +886,7 @@ public class MetadataManager implements IMetadataManager {
             feedPolicy = metadataNode.getFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
             metadataNode.dropFeedPolicy(mdTxnCtx.getJobId(), dataverseName, policyName);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         mdTxnCtx.dropFeedPolicy(feedPolicy);
     }
@@ -894,7 +897,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             dataverseFeedPolicies = metadataNode.getDataversePolicies(mdTxnCtx.getJobId(), dataverse);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return dataverseFeedPolicies;
     }
@@ -906,7 +909,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             externalFiles = metadataNode.getExternalFiles(mdTxnCtx.getJobId(), dataset);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return externalFiles;
     }
@@ -916,7 +919,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addExternalFile(ctx.getJobId(), externalFile);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -926,7 +929,7 @@ public class MetadataManager implements IMetadataManager {
             metadataNode.dropExternalFile(ctx.getJobId(), externalFile.getDataverseName(),
                     externalFile.getDatasetName(), externalFile.getFileNumber());
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -937,7 +940,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         return file;
     }
@@ -949,7 +952,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.dropExternalFiles(mdTxnCtx.getJobId(), dataset);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -958,7 +961,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.updateDataset(ctx.getJobId(), dataset);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
         // reflect the dataset into the cache
         ctx.dropDataset(dataset.getDataverseName(), dataset.getDatasetName());
@@ -984,7 +987,17 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.addEntity(mdTxnCtx.getJobId(), entity);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+        }
+    }
+
+    @Override
+    public <T extends IExtensionMetadataEntity> void upsertEntity(MetadataTransactionContext mdTxnCtx, T entity)
+            throws MetadataException {
+        try {
+            metadataNode.upsertEntity(mdTxnCtx.getJobId(), entity);
+        } catch (RemoteException e) {
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -994,7 +1007,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             metadataNode.deleteEntity(mdTxnCtx.getJobId(), entity);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -1004,7 +1017,7 @@ public class MetadataManager implements IMetadataManager {
         try {
             return metadataNode.getEntities(mdTxnCtx.getJobId(), searchKey);
         } catch (RemoteException e) {
-            throw new MetadataException(e);
+            throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
         }
     }
 
@@ -1046,9 +1059,9 @@ public class MetadataManager implements IMetadataManager {
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             } catch (RemoteException e) {
-                throw new HyracksDataException(e);
+                throw new RuntimeDataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
             }
             super.init();
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/a85f4121/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 84cb671..265e533 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -34,6 +34,7 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.LSMIndexUtil;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.AbstractOperationCallback;
@@ -97,6 +98,7 @@ import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
+import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
 import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.transaction.management.service.transaction.TransactionContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -124,8 +126,8 @@ import org.apache.hyracks.storage.common.MultiComparator;
 public class MetadataNode implements IMetadataNode {
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = Logger.getLogger(MetadataNode.class.getName());
-    private static final DatasetId METADATA_DATASET_ID = new ImmutableDatasetId(
-            MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
+    private static final DatasetId METADATA_DATASET_ID =
+            new ImmutableDatasetId(MetadataPrimaryIndexes.PROPERTIES_METADATA.getDatasetId());
 
     // shared between core and extension
     private IDatasetLifecycleManager datasetLifecycleManager;
@@ -173,8 +175,8 @@ public class MetadataNode implements IMetadataNode {
     @Override
     public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
         try {
-            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
-                    false);
+            ITransactionContext txnCtx =
+                    transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, DatasetId.NULL, -1);
         } catch (ACIDException e) {
             LOGGER.log(Level.WARNING, "Exception aborting transaction", e);
@@ -215,6 +217,25 @@ public class MetadataNode implements IMetadataNode {
     }
 
     /**
+     * Upsert entity to index
+     *
+     * @param jobId
+     * @param entity
+     * @param tupleTranslator
+     * @param index
+     * @throws MetadataException
+     */
+    private <T> void upsertEntity(JobId jobId, T entity, IMetadataEntityTupleTranslator<T> tupleTranslator,
+            IMetadataIndex index) throws MetadataException {
+        try {
+            ITupleReference tuple = tupleTranslator.getTupleFromMetadataEntity(entity);
+            upsertTupleIntoIndex(jobId, index, tuple);
+        } catch (HyracksDataException | ACIDException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    /**
      * Delete entity from index
      *
      * @param jobId
@@ -271,6 +292,18 @@ public class MetadataNode implements IMetadataNode {
 
     @SuppressWarnings("unchecked")
     @Override
+    public <T extends IExtensionMetadataEntity> void upsertEntity(JobId jobId, T entity)
+            throws MetadataException, RemoteException {
+        ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets.get(entity.getDatasetId());
+        if (index == null) {
+            throw new MetadataException("Metadata Extension Index: " + entity.getDatasetId() + " was not found");
+        }
+        IMetadataEntityTupleTranslator<T> tupleTranslator = index.getTupleTranslator();
+        upsertEntity(jobId, entity, tupleTranslator, index);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
     public <T extends IExtensionMetadataEntity> void deleteEntity(JobId jobId, T entity)
             throws MetadataException, RemoteException {
         ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets.get(entity.getDatasetId());
@@ -285,8 +318,8 @@ public class MetadataNode implements IMetadataNode {
     @Override
     public <T extends IExtensionMetadataEntity> List<T> getEntities(JobId jobId, IExtensionMetadataSearchKey searchKey)
             throws MetadataException, RemoteException {
-        ExtensionMetadataDataset<T> index = (ExtensionMetadataDataset<T>) extensionDatasets
-                .get(searchKey.getDatasetId());
+        ExtensionMetadataDataset<T> index =
+                (ExtensionMetadataDataset<T>) extensionDatasets.get(searchKey.getDatasetId());
         if (index == null) {
             throw new MetadataException("Metadata Extension Index: " + searchKey.getDatasetId() + " was not found");
         }
@@ -396,8 +429,8 @@ public class MetadataNode implements IMetadataNode {
     @Override
     public void addDatatype(JobId jobId, Datatype datatype) throws MetadataException, RemoteException {
         try {
-            DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this,
-                    true);
+            DatatypeTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, true);
             ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(datatype);
             insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple);
         } catch (HyracksDataException e) {
@@ -443,13 +476,13 @@ public class MetadataNode implements IMetadataNode {
             datasetLifecycleManager.open(resourceName);
 
             // prepare a Callback for logging
-            IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, Operation.INSERT);
+            IModificationOperationCallback modCallback =
+                    createIndexModificationCallback(jobId, resourceID, metadataIndex, lsmIndex, Operation.INSERT);
 
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
-                    false);
+            ITransactionContext txnCtx =
+                    transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             txnCtx.setWriteTxn(true);
             txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                     metadataIndex.isPrimaryIndex());
@@ -471,6 +504,39 @@ public class MetadataNode implements IMetadataNode {
         }
     }
 
+    private void upsertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
+            throws ACIDException, HyracksDataException {
+        long resourceId = metadataIndex.getResourceId();
+        String resourceName = metadataIndex.getFile().getRelativePath();
+        ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
+        datasetLifecycleManager.open(resourceName);
+        try {
+            // prepare a Callback for logging
+            ITransactionContext txnCtx =
+                    transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+            IModificationOperationCallback modCallback =
+                    new UpsertOperationCallback(metadataIndex.getDatasetId(), metadataIndex.getPrimaryKeyIndexes(),
+                            txnCtx, transactionSubsystem.getLockManager(), transactionSubsystem, resourceId,
+                            metadataStoragePartition, ResourceType.LSM_BTREE, Operation.UPSERT);
+            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
+            txnCtx.setWriteTxn(true);
+            txnCtx.registerIndexAndCallback(resourceId, lsmIndex, (AbstractOperationCallback) modCallback,
+                    metadataIndex.isPrimaryIndex());
+            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
+            indexAccessor.forceUpsert(tuple);
+            //Manually complete the operation after the insert. This is to decrement the resource counters within the
+            //index that determine how many tuples are still 'in-flight' within the index. Normally the log flusher
+            //does this. The only exception is the index registered as the "primary" which we will let be decremented
+            //by the job commit log event
+            if (!((TransactionContext) txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
+                lsmIndex.getOperationTracker().completeOperation(lsmIndex, LSMOperationType.FORCE_MODIFICATION, null,
+                        modCallback);
+            }
+        } finally {
+            datasetLifecycleManager.close(resourceName);
+        }
+    }
+
     private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
             IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation indexOp) throws ACIDException {
         ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
@@ -748,12 +814,12 @@ public class MetadataNode implements IMetadataNode {
         try {
             datasetLifecycleManager.open(resourceName);
             // prepare a Callback for logging
-            IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID,
-                    metadataIndex, lsmIndex, Operation.DELETE);
+            IModificationOperationCallback modCallback =
+                    createIndexModificationCallback(jobId, resourceID, metadataIndex, lsmIndex, Operation.DELETE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
 
-            ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
-                    false);
+            ITransactionContext txnCtx =
+                    transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
             txnCtx.setWriteTxn(true);
             txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
                     metadataIndex.isPrimaryIndex());
@@ -852,8 +918,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName);
-            DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this,
-                    false);
+            DatatypeTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false);
             IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<Datatype> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results);
@@ -897,8 +963,8 @@ public class MetadataNode implements IMetadataNode {
     public List<Datatype> getAllDatatypes(JobId jobId) throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = null;
-            DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this,
-                    false);
+            DatatypeTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false);
             IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<Datatype> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results);
@@ -1010,8 +1076,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName);
-            IndexTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getIndexTupleTranslator(jobId, this,
-                    false);
+            IndexTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, false);
             IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<Index> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results);
@@ -1029,8 +1095,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName, datasetName);
-            IndexTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getIndexTupleTranslator(jobId, this,
-                    false);
+            IndexTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getIndexTupleTranslator(jobId, this, false);
             IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<Index> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results);
@@ -1045,8 +1111,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName, datatypeName);
-            DatatypeTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this,
-                    false);
+            DatatypeTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getDataTypeTupleTranslator(jobId, this, false);
             IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<Datatype> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results);
@@ -1111,8 +1177,8 @@ public class MetadataNode implements IMetadataNode {
                     "" + functionSignature.getArity());
             // Searches the index for the tuple to be deleted. Acquires an S
             // lock on the 'function' dataset.
-            ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET,
-                    searchKey);
+            ITupleReference functionTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple);
 
             // TODO: Change this to be a BTree specific exception, e.g.,
@@ -1153,8 +1219,8 @@ public class MetadataNode implements IMetadataNode {
             String resourceName = index.getFile().toString();
             IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
-            IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            IIndexAccessor indexAccessor =
+                    indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
 
             RangePredicate rangePred = null;
@@ -1174,8 +1240,8 @@ public class MetadataNode implements IMetadataNode {
             index = MetadataPrimaryIndexes.DATASET_DATASET;
             indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
-            indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            indexAccessor =
+                    indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
 
             rangePred = null;
@@ -1196,8 +1262,8 @@ public class MetadataNode implements IMetadataNode {
             index = MetadataPrimaryIndexes.INDEX_DATASET;
             indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
-            indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
+            indexAccessor =
+                    indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
             rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
 
             rangePred = null;
@@ -1232,8 +1298,8 @@ public class MetadataNode implements IMetadataNode {
         String resourceName = index.getFile().getRelativePath();
         IIndex indexInstance = datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
-        IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                NoOpOperationCallback.INSTANCE);
+        IIndexAccessor indexAccessor =
+                indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
         ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
 
         IBinaryComparator[] searchCmps = null;
@@ -1271,8 +1337,8 @@ public class MetadataNode implements IMetadataNode {
             IIndex indexInstance = datasetLifecycleManager.get(resourceName);
             datasetLifecycleManager.open(resourceName);
             try {
-                IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
-                        NoOpOperationCallback.INSTANCE);
+                IIndexAccessor indexAccessor =
+                        indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false);
 
                 DatasetTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getDatasetTupleTranslator(false);
@@ -1310,8 +1376,8 @@ public class MetadataNode implements IMetadataNode {
     // Hyracks version.
     public static ITupleReference createTuple(String... fields) {
         @SuppressWarnings("unchecked")
-        ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(BuiltinType.ASTRING);
+        ISerializerDeserializer<AString> stringSerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
         AMutableString aString = new AMutableString("");
         ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fields.length);
         for (String s : fields) {
@@ -1348,8 +1414,8 @@ public class MetadataNode implements IMetadataNode {
     public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException {
         try {
             // Insert into the 'Adapter' dataset.
-            DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getAdapterTupleTranslator(true);
+            DatasourceAdapterTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getAdapterTupleTranslator(true);
             ITupleReference adapterTuple = tupleReaderWriter.getTupleFromMetadataEntity(adapter);
             insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple);
         } catch (HyracksDataException e) {
@@ -1376,8 +1442,8 @@ public class MetadataNode implements IMetadataNode {
             ITupleReference searchKey = createTuple(dataverseName, adapterName);
             // Searches the index for the tuple to be deleted. Acquires an S
             // lock on the 'Adapter' dataset.
-            ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
-                    searchKey);
+            ITupleReference datasetTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, datasetTuple);
 
             // TODO: Change this to be a BTree specific exception, e.g.,
@@ -1400,8 +1466,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName, adapterName);
-            DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getAdapterTupleTranslator(false);
+            DatasourceAdapterTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getAdapterTupleTranslator(false);
             List<DatasourceAdapter> results = new ArrayList<>();
             IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results);
@@ -1419,8 +1485,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             // Insert into the 'CompactionPolicy' dataset.
-            CompactionPolicyTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getCompactionPolicyTupleTranslator(true);
+            CompactionPolicyTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getCompactionPolicyTupleTranslator(true);
             ITupleReference compactionPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(compactionPolicy);
             insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, compactionPolicyTuple);
         } catch (HyracksDataException e) {
@@ -1440,8 +1506,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverse, policyName);
-            CompactionPolicyTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getCompactionPolicyTupleTranslator(false);
+            CompactionPolicyTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getCompactionPolicyTupleTranslator(false);
             List<CompactionPolicy> results = new ArrayList<>();
             IValueExtractor<CompactionPolicy> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             searchIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, searchKey, valueExtractor, results);
@@ -1459,8 +1525,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName);
-            DatasourceAdapterTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getAdapterTupleTranslator(false);
+            DatasourceAdapterTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getAdapterTupleTranslator(false);
             IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<DatasourceAdapter> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results);
@@ -1502,8 +1568,8 @@ public class MetadataNode implements IMetadataNode {
             ITupleReference searchKey = createTuple(dataverseName, libraryName);
             // Searches the index for the tuple to be deleted. Acquires an S
             // lock on the 'Adapter' dataset.
-            ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET,
-                    searchKey);
+            ITupleReference datasetTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, datasetTuple);
 
             // TODO: Change this to be a BTree specific exception, e.g.,
@@ -1630,8 +1696,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataverseName, feedName, datasetName);
-            ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET,
-                    searchKey);
+            ITupleReference tuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET, tuple);
         } catch (HyracksDataException | ACIDException e) {
             throw new MetadataException(e);
@@ -1734,8 +1800,8 @@ public class MetadataNode implements IMetadataNode {
     public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
         try {
             // Insert into the 'externalFiles' dataset.
-            ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getExternalFileTupleTranslator(true);
+            ExternalFileTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getExternalFileTupleTranslator(true);
             ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
             insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
         } catch (HyracksDataException e) {
@@ -1755,8 +1821,8 @@ public class MetadataNode implements IMetadataNode {
     public List<ExternalFile> getExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName());
-            ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getExternalFileTupleTranslator(false);
+            ExternalFileTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getExternalFileTupleTranslator(false);
             IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<ExternalFile> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
@@ -1774,8 +1840,8 @@ public class MetadataNode implements IMetadataNode {
             ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
             // Searches the index for the tuple to be deleted. Acquires an S
             // lock on the 'ExternalFile' dataset.
-            ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET,
-                    searchKey);
+            ITupleReference datasetTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple);
         } catch (HyracksDataException e) {
             if (e.getComponent().equals(ErrorCode.HYRACKS)
@@ -1803,10 +1869,10 @@ public class MetadataNode implements IMetadataNode {
     @SuppressWarnings("unchecked")
     public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber)
             throws HyracksDataException {
-        ISerializerDeserializer<AString> stringSerde = SerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(BuiltinType.ASTRING);
-        ISerializerDeserializer<AInt32> intSerde = SerializerDeserializerProvider.INSTANCE
-                .getSerializerDeserializer(BuiltinType.AINT32);
+        ISerializerDeserializer<AString> stringSerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
+        ISerializerDeserializer<AInt32> intSerde =
+                SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
 
         AMutableString aString = new AMutableString("");
         ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3);
@@ -1835,8 +1901,8 @@ public class MetadataNode implements IMetadataNode {
             throws MetadataException, RemoteException {
         try {
             ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
-            ExternalFileTupleTranslator tupleReaderWriter = tupleTranslatorProvider
-                    .getExternalFileTupleTranslator(false);
+            ExternalFileTupleTranslator tupleReaderWriter =
+                    tupleTranslatorProvider.getExternalFileTupleTranslator(false);
             IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<>(tupleReaderWriter);
             List<ExternalFile> results = new ArrayList<>();
             searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
@@ -1858,8 +1924,8 @@ public class MetadataNode implements IMetadataNode {
             searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName());
             // Searches the index for the tuple to be deleted. Acquires an S
             // lock on the 'dataset' dataset.
-            ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET,
-                    searchKey);
+            ITupleReference datasetTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
             // Previous tuple was deleted
             // Insert into the 'dataset' dataset.
@@ -1876,10 +1942,10 @@ public class MetadataNode implements IMetadataNode {
         try {
             // remove old function
             ITupleReference searchKey;
-            searchKey = createTuple(function.getDataverseName(), function.getName(),
-                    Integer.toString(function.getArity()));
-            ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET,
-                    searchKey);
+            searchKey =
+                    createTuple(function.getDataverseName(), function.getName(), Integer.toString(function.getArity()));
+            ITupleReference functionTuple =
+                    getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey);
             deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple);
             // add new function
             FunctionTupleTranslator functionTupleTranslator = tupleTranslatorProvider.getFunctionTupleTranslator(true);


Mime
View raw message