asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amo...@apache.org
Subject [1/3] asterixdb git commit: Cleanup and bug fixes in Feeds pipeline
Date Wed, 08 Mar 2017 20:53:16 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master 31d8102aa -> 8c427cd4b


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 8f005d8..77020f8 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -22,8 +22,8 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -36,24 +36,24 @@ import org.apache.hyracks.util.IntSerDeUtils;
  * This appender must only be used on network boundary
  */
 public class MessagingFrameTupleAppender extends FrameTupleAppender {
-
-    private final IHyracksTaskContext ctx;
-    private static final int NULL_MESSAGE_SIZE = 1;
+    public static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
     public static final byte MARKER_MESSAGE = 0x03;
+
+    private final IHyracksTaskContext ctx;
     private boolean initialized = false;
-    private VSizeFrame message;
+    private IFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
 
-    public static void printMessage(VSizeFrame message, PrintStream out) throws HyracksDataException
{
+    public static void printMessage(IFrame message, PrintStream out) throws HyracksDataException
{
         out.println(getMessageString(message));
     }
 
-    public static String getMessageString(VSizeFrame message) throws HyracksDataException
{
+    public static String getMessageString(IFrame message) throws HyracksDataException {
         StringBuilder aString = new StringBuilder();
         aString.append("Message Type: ");
         switch (getMessageType(message)) {
@@ -76,7 +76,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         return aString.toString();
     }
 
-    public static byte getMessageType(VSizeFrame message) throws HyracksDataException {
+    public static byte getMessageType(IFrame message) throws HyracksDataException {
         switch (message.getBuffer().array()[0]) {
             case NULL_FEED_MESSAGE:
                 return NULL_FEED_MESSAGE;
@@ -105,15 +105,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender
{
 
     @Override
     public int getTupleCount() {
-        // if message is set, there is always a message. that message could be a null message
(TODO: optimize)
-        return tupleCount + ((message == null) ? 0 : 1);
+        return tupleCount + 1;
     }
 
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException
{
         if (!initialized) {
-            message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
-            initialized = true;
+            init();
         }
         // If message fits, we append it, otherwise, we append a null message, then send
a message only
         // frame with the message
@@ -125,7 +123,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         } else {
             ByteBuffer buffer = message.getBuffer();
             int messageSize = buffer.limit() - buffer.position();
-            if (hasEnoughSpace(1, messageSize)) {
+            if (hasEnoughSpace(0, messageSize)) {
                 appendMessage(buffer);
                 forward(outWriter);
             } else {
@@ -133,7 +131,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
                     appendNullMessage();
                     forward(outWriter);
                 }
-                if (!hasEnoughSpace(1, messageSize)) {
+                if (!hasEnoughSpace(0, messageSize)) {
                     frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize,
frame.getMinSize()));
                     reset(frame.getBuffer(), true);
                 }
@@ -143,6 +141,11 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         }
     }
 
+    private void init() {
+        message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+        initialized = true;
+    }
+
     private void forward(IFrameWriter outWriter) throws HyracksDataException {
         getBuffer().clear();
         outWriter.nextFrame(getBuffer());
@@ -168,4 +171,13 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
         ++tupleCount;
         IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()),
tupleCount);
     }
+
+    /*
+     * Always write and then flush to send out the message if exists
+     */
+    @Override
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        write(writer, true);
+        writer.flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 6d87d89..dbd3afa 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -41,25 +41,30 @@ public class PartitionDataWriter implements IFrameWriter {
     private final ITuplePartitionComputer tpc;
     private final IHyracksTaskContext ctx;
     private boolean[] allocatedFrames;
+    private boolean failed = false;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount, IPartitionWriterFactory
pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException
{
+        this.ctx = ctx;
+        this.tpc = tpc;
         this.consumerPartitionCount = consumerPartitionCount;
         pWriters = new IFrameWriter[consumerPartitionCount];
         isOpen = new boolean[consumerPartitionCount];
         allocatedFrames = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        initializeAppenders(pwFactory);
+    }
+
+    protected void initializeAppenders(IPartitionWriterFactory pwFactory) throws HyracksDataException
{
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
                 appenders[i] = createTupleAppender(ctx);
             } catch (IOException e) {
-                throw new HyracksDataException(e);
+                throw HyracksDataException.create(e);
             }
         }
-        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
-        this.tpc = tpc;
-        this.ctx = ctx;
     }
 
     protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
@@ -71,25 +76,17 @@ public class PartitionDataWriter implements IFrameWriter {
         HyracksDataException closeException = null;
         for (int i = 0; i < pWriters.length; ++i) {
             if (isOpen[i]) {
-                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0) {
+                if (allocatedFrames[i] && appenders[i].getTupleCount() > 0 &&
!failed) {
                     try {
                         appenders[i].write(pWriters[i], true);
                     } catch (Throwable th) {
-                        if (closeException == null) {
-                            closeException = new HyracksDataException(th);
-                        } else {
-                            closeException.addSuppressed(th);
-                        }
+                        closeException = HyracksDataException.suppress(closeException, th);
                     }
                 }
                 try {
                     pWriters[i].close();
                 } catch (Throwable th) {
-                    if (closeException == null) {
-                        closeException = new HyracksDataException(th);
-                    } else {
-                        closeException.addSuppressed(th);
-                    }
+                    closeException = HyracksDataException.suppress(closeException, th);
                 }
             }
         }
@@ -126,17 +123,14 @@ public class PartitionDataWriter implements IFrameWriter {
 
     @Override
     public void fail() throws HyracksDataException {
+        failed = true;
         HyracksDataException failException = null;
         for (int i = 0; i < appenders.length; ++i) {
             if (isOpen[i]) {
                 try {
                     pWriters[i].fail();
                 } catch (Throwable th) {
-                    if (failException == null) {
-                        failException = new HyracksDataException(th);
-                    } else {
-                        failException.addSuppressed(th);
-                    }
+                    failException = HyracksDataException.suppress(failException, th);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
index a985b4d..b89922e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallbackFactory.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.storage.common.file.LocalResource;
 
 @FunctionalInterface
 public interface IModificationOperationCallbackFactory extends Serializable {
-    public IModificationOperationCallback createModificationOperationCallback(LocalResource
resource,
+    IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
             IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index 627994c..a19e69a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -23,10 +23,6 @@ import org.apache.hyracks.data.std.api.IValueReference;
 public class MutableArrayValueReference implements IValueReference {
     private byte[] array;
 
-    public MutableArrayValueReference() {
-        //mutable array. user doesn't need to specify the array in advance
-    }
-
     public MutableArrayValueReference(byte[] array) {
         this.array = array;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
new file mode 100644
index 0000000..de72690
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * An interface that is used to enable frame level operation on indexes
+ */
+@FunctionalInterface
+public interface IFrameOperationCallback {
+    /**
+     * Called once processing the frame is done before calling nextFrame on the next IFrameWriter
in
+     * the pipeline
+     *
+     * @param modified
+     *            true if the index was modified during the processing of the frame, false
otherwise
+     * @throws HyracksDataException
+     */
+    void frameCompleted(boolean modified) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
new file mode 100644
index 0000000..8031d32
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallbackFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.api;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A factory for {@link IFrameOperationCallback}
+ */
+@FunctionalInterface
+public interface IFrameOperationCallbackFactory extends Serializable {
+    /**
+     * Create a {@link IFrameOperationCallback} for an index operator
+     *
+     * @param ctx
+     *            the task context
+     * @param indexAccessor
+     *            the accessor for the index
+     * @return an instance of {@link IFrameOperationCallback}
+     */
+    IFrameOperationCallback createFrameOperationCallback(IHyracksTaskContext ctx, ILSMIndexAccessor
indexAccessor);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 7a2bc7c..f21c8a3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -28,34 +29,174 @@ import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMHarness {
 
+    /**
+     * Force modification even if memory component is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param tuple
+     *            the operation tuple
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
IndexException;
 
+    /**
+     * Modify the index if the memory component is not full, wait for a new memory component
if the current one is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param tryOperation
+     *            true if IO operation
+     * @param tuple
+     *            the operation tuple
+     * @return
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Search the index
+     *
+     * @param ctx
+     *            the search operation context
+     * @param cursor
+     *            the index cursor
+     * @param pred
+     *            the search predicate
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
             throws HyracksDataException, IndexException;
 
+    /**
+     * End the search
+     *
+     * @param ctx
+     * @throws HyracksDataException
+     */
     void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
+    /**
+     * Schedule a merge
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule full merge
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Perform a merge operation
+     *
+     * @param ctx
+     * @param operation
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
 
+    /**
+     * Schedule a flush
+     *
+     * @param ctx
+     * @param callback
+     * @throws HyracksDataException
+     */
     void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback) throws
HyracksDataException;
 
+    /**
+     * Perform a flush
+     *
+     * @param ctx
+     * @param operation
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
IndexException;
 
+    /**
+     * Add bulk loaded component
+     *
+     * @param index
+     *            the new component
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void addBulkLoadedComponent(ILSMDiskComponent index) throws HyracksDataException, IndexException;
 
+    /**
+     * Get index operation tracker
+     */
     ILSMOperationTracker getOperationTracker();
 
+    /**
+     * Schedule replication
+     *
+     * @param ctx
+     *            the operation context
+     * @param diskComponents
+     *            the disk component to be replicated
+     * @param bulkload
+     *            true if the components were bulk loaded, false otherwise
+     * @param opType
+     *            The operation type
+     * @throws HyracksDataException
+     */
     void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMDiskComponent>
diskComponents, boolean bulkload,
             LSMOperationType opType) throws HyracksDataException;
 
+    /**
+     * End a replication operation
+     *
+     * @param ctx
+     *            the operation context
+     * @throws HyracksDataException
+     */
     void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
+
+    /**
+     * Update the metadata of the memory component of the index. Waiting for a new memory
component if
+     * the current memory component is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param key
+     *            the meta key
+     * @param value
+     *            the meta value
+     * @throws HyracksDataException
+     */
+    void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference value)
+            throws HyracksDataException;
+
+    /**
+     * Force updating the metadata of the memory component of the index even if memory component
is full
+     *
+     * @param ctx
+     *            the operation context
+     * @param key
+     *            the meta key
+     * @param value
+     *            the meta value
+     * @throws HyracksDataException
+     */
+    void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference
value)
+            throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fecc674..90c70aa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.common.api;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
 import org.apache.hyracks.storage.am.common.api.IndexException;
@@ -34,16 +35,43 @@ import org.apache.hyracks.storage.am.common.api.TreeIndexException;
  * concurrent operations).
  */
 public interface ILSMIndexAccessor extends IIndexAccessor {
+    /**
+     * Schedule a flush operation
+     *
+     * @param callback
+     *            the IO operation callback
+     * @throws HyracksDataException
+     */
     void scheduleFlush(ILSMIOOperationCallback callback) throws HyracksDataException;
 
+    /**
+     * Schedule a merge operation
+     *
+     * @param callback
+     *            the merge operation callback
+     * @param components
+     *            the components to be merged
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleMerge(ILSMIOOperationCallback callback, List<ILSMDiskComponent> components)
             throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule a full merge
+     *
+     * @param callback
+     *            the merge operation callback
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void scheduleFullMerge(ILSMIOOperationCallback callback) throws HyracksDataException,
IndexException;
 
     /**
-     * Deletes the tuple from the memory component only.
+     * Delete the tuple from the memory component only. Don't replace with antimatter tuple
      *
+     * @param tuple
+     *            the tuple to be deleted
      * @throws HyracksDataException
      * @throws IndexException
      */
@@ -113,12 +141,49 @@ public interface ILSMIndexAccessor extends IIndexAccessor {
      */
     boolean tryUpsert(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Delete the tuple from the memory component only. Don't replace with antimatter tuple
+     * Perform operation even if the memory component is full
+     *
+     * @param tuple
+     *            the tuple to delete
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forcePhysicalDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Insert a new tuple (failing if duplicate key entry is found)
+     *
+     * @param tuple
+     *            the tuple to insert
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceInsert(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Force deleting an index entry even if the memory component is full
+     * replace the entry if found with an antimatter tuple, otherwise, simply insert the
antimatter tuple
+     *
+     * @param tuple
+     *            tuple to delete
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
     void forceDelete(ITupleReference tuple) throws HyracksDataException, IndexException;
 
+    /**
+     * Schedule a replication for disk components
+     *
+     * @param diskComponents
+     *            the components to be replicated
+     * @param bulkload
+     *            true if the components were bulkloaded, false otherwise
+     * @param opType
+     *            the operation type
+     * @throws HyracksDataException
+     */
     void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload,
LSMOperationType opType)
             throws HyracksDataException;
 
@@ -137,4 +202,24 @@ public interface ILSMIndexAccessor extends IIndexAccessor {
      * @throws TreeIndexException
      */
     void merge(ILSMIOOperation operation) throws HyracksDataException, IndexException;
+
+    /**
+     * Update the metadata of the memory component, wait for the new component if the current
one is UNWRITABLE
+     *
+     * @param key
+     *            the key
+     * @param value
+     *            the value
+     * @throws HyracksDataException
+     */
+    void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
+
+    /**
+     * Force update the metadata of the current memory component even if it is UNWRITABLE
+     *
+     * @param key
+     * @param value
+     * @throws HyracksDataException
+     */
+    void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6bf9312..01e85d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -28,16 +28,17 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -360,6 +361,44 @@ public class LSMHarness implements ILSMHarness {
         return modify(ctx, tryOperation, tuple, opType);
     }
 
+    @Override
+    public void updateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference
value)
+            throws HyracksDataException {
+        if (!lsmIndex.isMemoryComponentsAllocated()) {
+            lsmIndex.allocateMemoryComponents();
+        }
+        getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false);
+        try {
+            lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+        } finally {
+            exitAndComplete(ctx, LSMOperationType.MODIFICATION);
+        }
+    }
+
+    private void exitAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) throws
HyracksDataException {
+        try {
+            exitComponents(ctx, op, null, false);
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        } finally {
+            opTracker.completeOperation(null, op, null, ctx.getModificationCallback());
+        }
+    }
+
+    @Override
+    public void forceUpdateMeta(ILSMIndexOperationContext ctx, IValueReference key, IValueReference
value)
+            throws HyracksDataException {
+        if (!lsmIndex.isMemoryComponentsAllocated()) {
+            lsmIndex.allocateMemoryComponents();
+        }
+        getAndEnterComponents(ctx, LSMOperationType.FORCE_MODIFICATION, false);
+        try {
+            lsmIndex.getCurrentMemoryComponent().getMetadata().put(key, value);
+        } finally {
+            exitAndComplete(ctx, LSMOperationType.FORCE_MODIFICATION);
+        }
+    }
+
     private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference
tuple,
             LSMOperationType opType) throws HyracksDataException, IndexException {
         if (!lsmIndex.isMemoryComponentsAllocated()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 4199cfb..0fa69ec 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -164,4 +165,18 @@ public abstract class LSMTreeIndexAccessor implements ILSMIndexAccessor
{
         ctx.setOperation(IndexOperation.DELETE);
         lsmHarness.forceModify(ctx, tuple);
     }
+
+    @Override
+    public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException
{
+     // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.updateMeta(ctx,key,value);
+    }
+
+    @Override
+    public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException
{
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.forceUpdateMeta(ctx, key, value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8c427cd4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index cef4257..0a6ffd7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
 import java.util.List;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndexCursor;
 import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
@@ -185,4 +186,18 @@ public class LSMInvertedIndexAccessor implements ILSMIndexAccessor, IInvertedInd
         throw new UnsupportedOperationException("Cannot open inverted list cursor on lsm
inverted index.");
     }
 
+    @Override
+    public void updateMeta(IValueReference key, IValueReference value) throws HyracksDataException
{
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.updateMeta(ctx, key, value);
+    }
+
+    @Override
+    public void forceUpdateMeta(IValueReference key, IValueReference value) throws HyracksDataException
{
+        // a hack because delete only gets the memory component
+        ctx.setOperation(IndexOperation.DELETE);
+        lsmHarness.forceUpdateMeta(ctx, key, value);
+    }
+
 }


Mime
View raw message