mnemonic-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject incubator-mnemonic git commit: MNEMONIC-193: Extract core functions from MneMapreduceRecordWriter
Date Wed, 15 Feb 2017 22:31:29 GMT
Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 4ad3572ec -> 408625f56


MNEMONIC-193: Extract core functions from MneMapreduceRecordWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/408625f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/408625f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/408625f5

Branch: refs/heads/master
Commit: 408625f56405859d7b5cdaf572110b49163d0964
Parents: 4ad3572
Author: Wang, Gang(Gary) <gang1.wang@intel.com>
Authored: Tue Feb 14 23:22:17 2017 -0800
Committer: Wang, Gang(Gary) <gang1.wang@intel.com>
Committed: Wed Feb 15 14:25:24 2017 -0800

----------------------------------------------------------------------
 .../apache/mnemonic/hadoop/MneConfigHelper.java |  15 +-
 .../mnemonic/hadoop/MneDurableComputable.java   |  28 ++
 .../hadoop/MneDurableOutputSession.java         | 282 +++++++++++++++++++
 .../mnemonic/hadoop/MneDurableOutputValue.java  |  68 +++++
 .../hadoop/mapreduce/MneDurableComputable.java  |  28 --
 .../mapreduce/MneMapreduceRecordReader.java     |   9 +-
 .../mapreduce/MneMapreduceRecordWriter.java     | 177 +-----------
 .../hadoop/mapreduce/MneOutputFormat.java       |  10 +-
 .../mnemonic/mapreduce/MneMapreduceIOTest.java  |  39 ++-
 .../org/apache/mnemonic/mapreduce/Person.java   |  15 +-
 10 files changed, 427 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
index be78709..a91f00f 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneConfigHelper.java
@@ -33,22 +33,29 @@ import org.slf4j.LoggerFactory;
  */
 public class MneConfigHelper {
 
-  public static final String OUTPUT_CONFIG_PREFIX_DEFAULT = "mnemonic.output.";
-  public static final String INPUT_CONFIG_PREFIX_DEFAULT = "mnemonic.input.";
+  public static final String DEFAULT_OUTPUT_CONFIG_PREFIX = "mnemonic.output.";
+  public static final String DEFAULT_INPUT_CONFIG_PREFIX = "mnemonic.input.";
   public static final String DURABLE_TYPES = "durable.types";
   public static final String ENTITY_FACTORY_PROXIES = "entity.factory.proxies.class";
   public static final String SLOT_KEY_ID = "slot.key.id";
   public static final String MEM_SERVICE_NAME = "mem.service.name";
   public static final String MEM_POOL_SIZE = "mem.pool.size";
-  private static final long DEFAULT_OUTPUT_MEM_POOL_SIZE = 1024L * 1024 * 1024 * 4;
-  public static final String FILE_EXTENSION = ".mne";
+  public static final long DEFAULT_OUTPUT_MEM_POOL_SIZE = 1024L * 1024 * 1024 * 4;
+  public static final String DEFAULT_NAME_PART = "part";
+  public static final String DEFAULT_FILE_EXTENSION = ".mne";
+  public static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
 
   private static final Logger LOGGER = LoggerFactory.getLogger(MneConfigHelper.class);
 
   public static String getConfigName(String prefix, String partname) {
+    prefix = null == prefix ? "" : prefix;
     return prefix + partname;
   }
 
+  public static String getBaseOutputName(Configuration conf, String prefix) {
+    return conf.get(getConfigName(prefix, BASE_OUTPUT_NAME), DEFAULT_NAME_PART);
+  }
+
   public static void setDurableTypes(Configuration conf, String prefix, DurableType[] dtypes)
{
     String val = StringUtils.join(dtypes, ",");
     conf.set(getConfigName(prefix, DURABLE_TYPES), val);

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableComputable.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableComputable.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableComputable.java
new file mode 100644
index 0000000..eb23e65
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableComputable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import org.apache.mnemonic.CommonAllocator;
+
+public interface MneDurableComputable<A extends CommonAllocator<A>> {
+
+  A getAllocator();
+  
+  long getHandler();
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
new file mode 100644
index 0000000..e2c3a57
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import org.apache.mnemonic.ConfigurationException;
+import org.apache.mnemonic.Durable;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.EntityFactoryProxy;
+import org.apache.mnemonic.NonVolatileMemAllocator;
+import org.apache.mnemonic.OutOfHybridMemory;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.collections.DurableSinglyLinkedList;
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class MneDurableOutputSession<V>
+    implements MneDurableComputable<NonVolatileMemAllocator> {
+
+  private long poolSize;
+  private TaskAttemptContext taskAttemptContext;
+  private String serviceName;
+  private DurableType[] durableTypes;
+  private EntityFactoryProxy[] entityFactoryProxies;
+  private long slotKeyId;
+  private String baseOutputName;
+  private Path outputPath;
+
+  protected Map<V, DurableSinglyLinkedList<V>> m_recordmap;
+  protected boolean m_newpool;
+  protected long m_poolidx = 0L;
+  protected Pair<DurableType[], EntityFactoryProxy[]> m_recparmpair;
+  protected DurableSinglyLinkedList<V> m_listnode;
+  protected NonVolatileMemAllocator m_act;
+  protected Iterator<V> m_iter;
+
+  public MneDurableOutputSession(TaskAttemptContext taskAttemptContext) {
+    setTaskAttemptContext(taskAttemptContext);
+    m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
+  }
+
+  public void validateConfig() {
+    if (getDurableTypes().length < 1) {
+      throw new ConfigurationException("The durable type of record parameters does not exist");
+    } else {
+      if (DurableType.DURABLE == getDurableTypes()[0]
+          && getEntityFactoryProxies().length < 1) { /* T.B.D. BUFFER & CHUNK
*/
+        throw new ConfigurationException("The durable entity proxy of record parameters does
not exist");
+      }
+    }
+  }
+  
+  public void readConfig(String prefix) {
+    if (getTaskAttemptContext() == null) {
+      throw new ConfigurationException("taskAttemptContext has not yet been set");
+    }
+    Configuration conf = getTaskAttemptContext().getConfiguration();
+    setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
+    setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
+    setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
+        MneConfigHelper.getEntityFactoryProxies(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX)));
+    m_recparmpair = Utils.shiftDurableParams(getDurableTypes(), getEntityFactoryProxies(),
1);
+    setSlotKeyId(MneConfigHelper.getSlotKeyId(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
+    setPoolSize(MneConfigHelper.getMemPoolSize(conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX));
+    setBaseOutputName(MneConfigHelper.getBaseOutputName(conf, null));
+    validateConfig();
+  }
+
+  protected Path genNextPoolPath() {
+    Path ret = new Path(FileOutputFormat.getOutputPath(getTaskAttemptContext()),
+        FileOutputFormat.getUniqueFile(getTaskAttemptContext(),
+            String.format("%s-%05d", getBaseOutputName(), ++m_poolidx), MneConfigHelper.DEFAULT_FILE_EXTENSION));
+    return ret;
+  }
+
+  public void initNextPool() {
+    if (m_act != null) {
+      m_act.close();
+    }
+    setOutputPath(genNextPoolPath());
+    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()),
getPoolSize(),
+        getOutputPath().toString(), true);
+    m_newpool = true;
+  }
+
+  @Override
+  public NonVolatileMemAllocator getAllocator() {
+    return m_act;
+  }
+
+  @Override
+  public long getHandler() {
+    long ret = 0L;
+    if (null != m_listnode) {
+      m_listnode.getHandler();
+    }
+    return ret;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected V createDurableObjectRecord() {
+    V ret = null;
+    ret = (V) getEntityFactoryProxies()[0].create(m_act, m_recparmpair.getRight(), m_recparmpair.getLeft(),
false);
+    return ret;
+  }
+
+  public V newDurableObjectRecord() {
+    V ret = null;
+    DurableSinglyLinkedList<V> nv = null;
+    try {
+      nv = createDurableNode();
+      ret = createDurableObjectRecord();
+    } catch (OutOfHybridMemory e) {
+      if (nv != null) {
+        nv.destroy();
+      }
+      if (ret != null) {
+        ((Durable) ret).destroy();
+      }
+      initNextPool();
+      try { /* retry */
+        nv = createDurableNode();
+        ret = createDurableObjectRecord();
+      } catch (OutOfHybridMemory ee) {
+        if (nv != null) {
+          nv.destroy();
+        }
+        if (ret != null) {
+          ((Durable) ret).destroy();
+        }
+      }
+    }
+    if (ret != null) {
+      m_recordmap.put(ret, nv);
+    }
+    return ret;
+  }
+
+  protected DurableSinglyLinkedList<V> createDurableNode() {
+    DurableSinglyLinkedList<V> ret = null;
+    ret = DurableSinglyLinkedListFactory.create(m_act, getEntityFactoryProxies(), getDurableTypes(),
false);
+    return ret;
+  }
+
+  public void post(V v) {
+    DurableSinglyLinkedList<V> nv = null;
+    if (null == v) {
+      return;
+    }
+    if (DurableType.DURABLE == getDurableTypes()[0]) {
+      if (m_recordmap.containsKey(v)) {
+        nv = m_recordmap.remove(v);
+      } else {
+        throw new RuntimeException("The record hasn't been created by newDurableObjectRecord()");
+      }
+    } else {
+      try {
+        nv = createDurableNode();
+      } catch (OutOfHybridMemory e) {
+        initNextPool();
+        nv = createDurableNode();
+      }
+    }
+    if (nv != null) {
+      nv.setItem(v, false);
+    }
+    if (m_newpool) {
+      m_act.setHandler(getSlotKeyId(), nv.getHandler());
+      m_newpool = false;
+    } else {
+      m_listnode.setNext(nv, false);
+    }
+    m_listnode = nv;
+  }
+
+  public void destroyPendingRecord(V k) {
+    if (m_recordmap.containsKey(k)) {
+      m_recordmap.get(k).destroy();
+      ((Durable) k).destroy();
+    }
+  }
+
+  public void destroyAllPendingRecords() {
+    for (V k : m_recordmap.keySet()) {
+      destroyPendingRecord(k);
+    }
+  }
+
+  public void close() {
+    destroyAllPendingRecords();
+    m_act.close();
+  }
+
+  public long getSlotKeyId() {
+    return slotKeyId;
+  }
+
+  public void setSlotKeyId(long slotKeyId) {
+    this.slotKeyId = slotKeyId;
+  }
+
+  public EntityFactoryProxy[] getEntityFactoryProxies() {
+    return entityFactoryProxies;
+  }
+
+  public void setEntityFactoryProxies(EntityFactoryProxy[] entityFactoryProxies) {
+    this.entityFactoryProxies = entityFactoryProxies;
+  }
+
+  public DurableType[] getDurableTypes() {
+    return durableTypes;
+  }
+
+  public void setDurableTypes(DurableType[] durableTypes) {
+    this.durableTypes = durableTypes;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public long getPoolSize() {
+    return poolSize;
+  }
+
+  public Path getOutputPath() {
+    return outputPath;
+  }
+
+  public void setOutputPath(Path outputPath) {
+    this.outputPath = outputPath;
+  }
+
+  public void setPoolSize(long poolSize) {
+    this.poolSize = poolSize;
+  }
+
+  public TaskAttemptContext getTaskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public void setTaskAttemptContext(TaskAttemptContext taskAttemptContext) {
+    this.taskAttemptContext = taskAttemptContext;
+  }
+
+  public String getBaseOutputName() {
+    return baseOutputName;
+  }
+
+  public void setBaseOutputName(String baseOutputName) {
+    this.baseOutputName = baseOutputName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputValue.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputValue.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputValue.java
new file mode 100644
index 0000000..8b4b2bd
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputValue.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class MneDurableOutputValue<V>
+    implements Writable {
+
+  protected MneDurableOutputSession<V> m_session;
+  protected V m_value;
+
+  public MneDurableOutputValue(MneDurableOutputSession<V> sess) {
+    m_session = sess;
+  }
+
+  public MneDurableOutputValue(MneDurableOutputSession<V> sess, V value) {
+    m_session = sess;
+    m_value = value;
+  }
+
+  public MneDurableOutputSession<V> getSession() {
+    return m_session;
+  }
+
+  public MneDurableOutputValue<V> of(V value) {
+    m_value = value;
+    return this;
+  }
+
+  public V getValue() {
+    return m_value;
+  }
+  
+  public void post() {
+    m_session.post(m_value);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneDurableComputable.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneDurableComputable.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneDurableComputable.java
deleted file mode 100644
index c427454..0000000
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneDurableComputable.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mnemonic.hadoop.mapreduce;
-
-import org.apache.mnemonic.CommonAllocator;
-
-public interface MneDurableComputable<A extends CommonAllocator<A>> {
-
-  A getAllocator();
-  
-  long getHandler();
-}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index 3743916..f253cbc 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -32,6 +32,7 @@ import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableComputable;
 
 /**
  * This record reader implements the org.apache.hadoop.mapreduce API.
@@ -65,11 +66,11 @@ public class MneMapreduceRecordReader<V>
     FileSplit split = (FileSplit) inputSplit;
     m_context = context;
     m_conf = m_context.getConfiguration();
-    m_msvrname = MneConfigHelper.getMemServiceName(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT);
-    m_gtypes = MneConfigHelper.getDurableTypes(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT);
+    m_msvrname = MneConfigHelper.getMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+    m_gtypes = MneConfigHelper.getDurableTypes(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
     m_efproxies = Utils.instantiateEntityFactoryProxies(
-        MneConfigHelper.getEntityFactoryProxies(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT));
-    m_slotkeyid = MneConfigHelper.getSlotKeyId(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT);
+        MneConfigHelper.getEntityFactoryProxies(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
+    m_slotkeyid = MneConfigHelper.getSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
     
     DurableSinglyLinkedList<V> dsllist;
 

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordWriter.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordWriter.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordWriter.java
index 242ee46..fb8ede0 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordWriter.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordWriter.java
@@ -19,189 +19,20 @@
 package org.apache.mnemonic.hadoop.mapreduce;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.mnemonic.ConfigurationException;
-import org.apache.mnemonic.Durable;
-import org.apache.mnemonic.DurableType;
-import org.apache.mnemonic.EntityFactoryProxy;
-import org.apache.mnemonic.NonVolatileMemAllocator;
-import org.apache.mnemonic.OutOfHybridMemory;
-import org.apache.mnemonic.Utils;
-import org.apache.mnemonic.hadoop.MneConfigHelper;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
-import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 
-public class MneMapreduceRecordWriter<V> extends RecordWriter<NullWritable, V>
-    implements MneDurableComputable<NonVolatileMemAllocator> {
-
-  protected Configuration m_conf;
-  protected TaskAttemptContext m_context;
-  protected NonVolatileMemAllocator m_act;
-  protected Iterator<V> m_iter;
-  protected long m_poolsz;
-  protected long m_slotkeyid;
-  protected DurableType[] m_gtypes;
-  protected EntityFactoryProxy[] m_efproxies;
-  protected String m_msvrname;
-  protected long m_poolidx = 0;
-  protected String m_outbname;
-  protected String m_outext;
-  protected Map<V, DurableSinglyLinkedList<V>> m_recordmap;
-  protected boolean m_newpool;
-  protected Pair<DurableType[], EntityFactoryProxy[]> m_recparmpair;
-  protected DurableSinglyLinkedList<V> m_listnode;
-
-  public MneMapreduceRecordWriter(TaskAttemptContext context, String outbname, String extension)
{
-    this(context.getConfiguration());
-    m_context = context;
-    m_outbname = outbname;
-    m_outext = extension;
-    initNextPool();
-  }
-
-  protected MneMapreduceRecordWriter(Configuration conf) {
-    m_conf = conf;
-    m_msvrname = MneConfigHelper.getMemServiceName(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT);
-    m_gtypes = MneConfigHelper.getDurableTypes(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT);
-    m_efproxies = Utils.instantiateEntityFactoryProxies(
-        MneConfigHelper.getEntityFactoryProxies(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT));
-    m_recparmpair = Utils.shiftDurableParams(m_gtypes, m_efproxies, 1);
-    m_slotkeyid = MneConfigHelper.getSlotKeyId(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT);
-    m_poolsz = MneConfigHelper.getMemPoolSize(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT);
-    m_recordmap = new HashMap<V, DurableSinglyLinkedList<V>>();
-    if (m_gtypes.length < 1) {
-      throw new ConfigurationException("The durable type of record parameters does not exist");
-    } else {
-      if (DurableType.DURABLE == m_gtypes[0]
-          && m_efproxies.length < 1) { /* T.B.D. BUFFER & CHUNK */
-        throw new ConfigurationException("The durable entity proxy of record parameters does
not exist");
-      }
-    }
-  }
-
-  protected Path genNextPoolPath() {
-    Path ret = new Path(FileOutputFormat.getOutputPath(m_context),
-        FileOutputFormat.getUniqueFile(m_context, String.format("%s-%05d", m_outbname, ++m_poolidx),
m_outext));
-    return ret;
-  }
-
-  protected void initNextPool() {
-    if (m_act != null) {
-      m_act.close();
-    }
-    Path outpath = genNextPoolPath();
-    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(m_msvrname),
m_poolsz,
-        outpath.toString(), true);
-    m_newpool = true;
-  }
-
-  @Override
-  public NonVolatileMemAllocator getAllocator() {
-    return m_act;
-  }
-
-  @Override
-  public long getHandler() {
-    long ret = 0L;
-    if (null != m_listnode) {
-      m_listnode.getHandler();
-    }
-    return ret;
-  }
-
-  @SuppressWarnings("unchecked")
-  protected V createDurableObjectRecord() {
-    V ret = null;
-    ret = (V) m_efproxies[0].create(m_act, m_recparmpair.getRight(), m_recparmpair.getLeft(),
false);
-    return ret;
-  }
-
-  public V newDurableObjectRecord() {
-    V ret = null;
-    DurableSinglyLinkedList<V> nv = null;
-    try {
-      nv = createDurableNode();
-      ret = createDurableObjectRecord();
-    } catch (OutOfHybridMemory e) {
-      if (nv != null) {
-        nv.destroy();
-      }
-      if (ret != null) {
-        ((Durable) ret).destroy();
-      }
-      initNextPool();
-      try { /* retry */
-        nv = createDurableNode();
-        ret = createDurableObjectRecord();
-      } catch (OutOfHybridMemory ee) {
-        if (nv != null) {
-          nv.destroy();
-        }
-        if (ret != null) {
-          ((Durable) ret).destroy();
-        }
-      }
-    }
-    if (ret != null) {
-      m_recordmap.put(ret, nv);
-    }
-    return ret;
-  }
-
-  protected DurableSinglyLinkedList<V> createDurableNode() {
-    DurableSinglyLinkedList<V> ret = null;
-    ret = DurableSinglyLinkedListFactory.create(m_act, m_efproxies, m_gtypes, false);
-    return ret;
-  }
+public class MneMapreduceRecordWriter<MV extends MneDurableOutputValue<?>> extends
RecordWriter<NullWritable, MV> {
 
   @Override
-  public void write(NullWritable nullWritable, V v) throws IOException {
-    DurableSinglyLinkedList<V> nv = null;
-    if (null == v) {
-      return;
-    }
-    if (DurableType.DURABLE == m_gtypes[0]) {
-      if (m_recordmap.containsKey(v)) {
-        nv = m_recordmap.remove(v);
-      } else {
-        throw new RuntimeException("The record hasn't been created by newDurableObjectRecord()");
-      }
-    } else {
-      try {
-        nv = createDurableNode();
-      } catch (OutOfHybridMemory e) {
-        initNextPool();
-        nv = createDurableNode();
-      }
-    }
-    if (nv != null) {
-      nv.setItem(v, false);
-    }
-    if (m_newpool) {
-      m_act.setHandler(m_slotkeyid, nv.getHandler());
-      m_newpool = false;
-    } else {
-      m_listnode.setNext(nv, false);
-    }
-    m_listnode = nv;
+  public void write(NullWritable nullWritable, MV mdvalue) throws IOException {
+    mdvalue.post();
   }
 
   @Override
   public void close(TaskAttemptContext taskAttemptContext) throws IOException {
-    for (V k : m_recordmap.keySet()) {
-      m_recordmap.get(k).destroy();
-      ((Durable) k).destroy();
-    }
-    m_act.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneOutputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneOutputFormat.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneOutputFormat.java
index 143243c..34decbe 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneOutputFormat.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneOutputFormat.java
@@ -21,21 +21,19 @@ package org.apache.mnemonic.hadoop.mapreduce;
 import java.io.IOException;
 
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 
 /**
  * A Mnemonic output format that satisfies the org.apache.hadoop.mapreduce API.
  */
-public class MneOutputFormat<V extends Writable> extends FileOutputFormat<NullWritable,
V> {
+public class MneOutputFormat<MV extends MneDurableOutputValue<?>> extends FileOutputFormat<NullWritable,
MV> {
 
   @Override
-  public RecordWriter<NullWritable, V> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException {
-    return new MneMapreduceRecordWriter<V>(taskAttemptContext, getOutputName(taskAttemptContext),
-        MneConfigHelper.FILE_EXTENSION);
+  public RecordWriter<NullWritable, MV> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException {
+    return new MneMapreduceRecordWriter<MV>();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
index 33e7697..41ac8f5 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
@@ -38,8 +38,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
-import org.apache.mnemonic.hadoop.mapreduce.MneMapreduceRecordWriter;
 import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
 import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
@@ -79,20 +80,20 @@ public class MneMapreduceIOTest {
 
     m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
 
-    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT,
SERVICE_NAME);
-    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT, SLOT_KEY_ID);
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT, new DurableType[] {DurableType.DURABLE});
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.INPUT_CONFIG_PREFIX_DEFAULT, new Class<?>[] {PersonListEFProxy.class});
-    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT,
SERVICE_NAME);
-    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT, SLOT_KEY_ID);
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
     MneConfigHelper.setMemPoolSize(m_conf,
-        MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT, 1024L * 1024 * 1024 * 4);
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
     MneConfigHelper.setDurableTypes(m_conf,
-        MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT, new DurableType[] {DurableType.DURABLE});
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.DURABLE});
     MneConfigHelper.setEntityFactoryProxies(m_conf,
-        MneConfigHelper.OUTPUT_CONFIG_PREFIX_DEFAULT, new Class<?>[] {PersonListEFProxy.class});
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {PersonListEFProxy.class});
   }
 
   @AfterClass
@@ -103,17 +104,25 @@ public class MneMapreduceIOTest {
   @Test(enabled = true)
   public void testWritePersonData() throws Exception {
     NullWritable nada = NullWritable.get();
-    OutputFormat<NullWritable, Person<Long>> outputFormat = new MneOutputFormat<Person<Long>>();
-    RecordWriter<NullWritable, Person<Long>> writer = outputFormat.getRecordWriter(m_tacontext);
+    MneDurableOutputSession<Person<Long>> sess = new MneDurableOutputSession<Person<Long>>(m_tacontext);
+    sess.readConfig(MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    sess.initNextPool();
+    MneDurableOutputValue<Person<Long>> mdvalue =
+        new MneDurableOutputValue<Person<Long>>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<Person<Long>>> outputFormat
=
+        new MneOutputFormat<MneDurableOutputValue<Person<Long>>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<Person<Long>>> writer
=
+        outputFormat.getRecordWriter(m_tacontext);
     Person<Long> person = null;
     for (int i = 0; i < m_reccnt; ++i) {
-      person = ((MneMapreduceRecordWriter<Person<Long>>) writer).newDurableObjectRecord();
+      person = sess.newDurableObjectRecord();
       person.setAge((short) m_rand.nextInt(50));
       person.setName(String.format("Name: [%s]", Utils.genRandomString()), true);
       m_sumage += person.getAge();
-      writer.write(nada, person);
+      writer.write(nada, mdvalue.of(person));
     }
     writer.close(m_tacontext);
+    sess.close();
   }
 
   @Test(enabled = true, dependsOnMethods = { "testWritePersonData" })
@@ -124,7 +133,7 @@ public class MneMapreduceIOTest {
     File[] listfiles = folder.listFiles();
     for (int idx = 0; idx < listfiles.length; ++idx) {
       if (listfiles[idx].isFile()
-          && listfiles[idx].getName().endsWith(MneConfigHelper.FILE_EXTENSION)) {
+          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION))
{
         System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
         FileSplit split = new FileSplit(
             new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/408625f5/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
index c5edd49..f61dd80 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/Person.java
@@ -17,11 +17,6 @@
 
 package org.apache.mnemonic.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
 import org.apache.mnemonic.Durable;
 import org.apache.mnemonic.EntityFactoryProxy;
 import org.apache.mnemonic.DurableEntity;
@@ -37,7 +32,7 @@ import org.apache.mnemonic.DurableType;
  */
 
 @DurableEntity
-public abstract class Person<E> implements Durable, Comparable<Person<E>>,
Writable {
+public abstract class Person<E> implements Durable, Comparable<Person<E>>
{
   E element;
 
   @Override
@@ -75,14 +70,6 @@ public abstract class Person<E> implements Durable, Comparable<Person<E>>,
Writa
     return ret;
   }
 
-  public void write(DataOutput out) throws IOException {
-
-  }
-
-  public void readFields(DataInput in) throws IOException {
-
-  }
-
   @DurableGetter(Id = 1L)
   public abstract short getAge();
 



Mime
View raw message