mnemonic-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject incubator-mnemonic git commit: MNEMONIC-197: Add Managed ByteBuffer access support MNEMONIC-198: Add Managed data chunk access support
Date Tue, 28 Feb 2017 20:57:05 GMT
Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 8d861f77e -> be1ef9ff9


MNEMONIC-197: Add Managed ByteBuffer access support
MNEMONIC-198: Add Managed data chunk access support


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/be1ef9ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/be1ef9ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/be1ef9ff

Branch: refs/heads/master
Commit: be1ef9ff964d589803419e32c14910930064f78d
Parents: 8d861f7
Author: Wang, Gang(Gary) <gang1.wang@intel.com>
Authored: Mon Feb 27 11:05:27 2017 -0800
Committer: Wang, Gang(Gary) <gang1.wang@intel.com>
Committed: Tue Feb 28 12:48:55 2017 -0800

----------------------------------------------------------------------
 mnemonic-core/pom.xml                           |   1 -
 .../mnemonic-hadoop-mapreduce/pom.xml           |   4 +
 .../mnemonic/hadoop/MneDurableInputSession.java |   7 +-
 .../hadoop/MneDurableOutputSession.java         |  68 ++++--
 .../apache/mnemonic/hadoop/MneInputSession.java |  32 +++
 .../mnemonic/hadoop/MneOutputSession.java       |  29 +++
 .../mapreduce/MneMapreduceBufferDataTest.java   | 203 ++++++++++++++++++
 .../mapreduce/MneMapreduceChunkDataTest.java    | 207 +++++++++++++++++++
 pom.xml                                         |   5 +
 9 files changed, 542 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-core/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-core/pom.xml b/mnemonic-core/pom.xml
index 95e6260..9f2cefb 100644
--- a/mnemonic-core/pom.xml
+++ b/mnemonic-core/pom.xml
@@ -35,7 +35,6 @@
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
-      <version>3.4</version>
     </dependency>
     <dependency>
       <groupId>org.testng</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/pom.xml b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/pom.xml
index 8273fcf..7d79fdf 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/pom.xml
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/pom.xml
@@ -38,6 +38,10 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
index 71833ba..81c1373 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -31,7 +31,8 @@ import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.collections.DurableSinglyLinkedList;
 import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 
-public class MneDurableInputSession<V> implements MneDurableComputable<NonVolatileMemAllocator>
{
+public class MneDurableInputSession<V>
+    implements MneInputSession<V>, MneDurableComputable<NonVolatileMemAllocator>
{
 
   private TaskAttemptContext taskAttemptContext;
   private String serviceName;
@@ -58,6 +59,7 @@ public class MneDurableInputSession<V> implements MneDurableComputable<NonVolati
     }
   }
 
+  @Override
   public void readConfig(String prefix) {
     if (getTaskAttemptContext() == null) {
       throw new ConfigurationException("taskAttemptContext has not yet been set");
@@ -71,6 +73,7 @@ public class MneDurableInputSession<V> implements MneDurableComputable<NonVolati
     validateConfig();
   }
 
+  @Override
   public void initialize(Path path) {
     DurableSinglyLinkedList<V> dsllist;
     m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()),
1024000L,
@@ -81,10 +84,12 @@ public class MneDurableInputSession<V> implements MneDurableComputable<NonVolati
     m_iter = dsllist.iterator();
   }
 
+  @Override
   public Iterator<V> iterator() {
     return m_iter;
   }
 
+  @Override
   public void close() {
     m_act.close();
   }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
index c3ee0a2..9d40756 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableOutputSession.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class MneDurableOutputSession<V>
-    implements MneDurableComputable<NonVolatileMemAllocator> {
+    implements MneOutputSession<V>, MneDurableComputable<NonVolatileMemAllocator>
{
 
   private long poolSize;
   private TaskAttemptContext taskAttemptContext;
@@ -73,7 +73,8 @@ public class MneDurableOutputSession<V>
       }
     }
   }
-  
+
+  @Override
   public void readConfig(String prefix) {
     if (getTaskAttemptContext() == null) {
       throw new ConfigurationException("taskAttemptContext has not yet been set");
@@ -97,6 +98,7 @@ public class MneDurableOutputSession<V>
     return ret;
   }
 
+  @Override
   public void initNextPool() {
     if (m_act != null) {
       m_act.close();
@@ -122,21 +124,53 @@ public class MneDurableOutputSession<V>
   }
 
   @SuppressWarnings("unchecked")
-  protected V createDurableObjectRecord() {
+  protected V createDurableObjectRecord(long size) {
     V ret = null;
-    if (getDurableTypes()[0] == DurableType.DURABLE) {
+    switch (getDurableTypes()[0]) {
+    case DURABLE:
       ret = (V) getEntityFactoryProxies()[0].create(m_act,
           m_recparmpair.getRight(), m_recparmpair.getLeft(), false);
+    case BUFFER:
+      if (size > 0) {
+        ret = (V)m_act.createBuffer(size);
+        if (null == ret) {
+          throw new OutOfHybridMemory("Allocate a buffer failed");
+        }
+      }
+      break;
+    case CHUNK:
+      if (size > 0) {
+        ret = (V)m_act.createChunk(size);
+        if (null == ret) {
+          throw new OutOfHybridMemory("Allocate a chunk failed");
+        }
+      }
+      break;
+    default:
+      break;
     }
     return ret;
   }
 
   public V newDurableObjectRecord() {
+    return newDurableObjectRecord(-1L);
+  }
+
+  /**
+   * create a durable object record
+   *
+   * @param size
+   *        size of buffer or chunk
+   *
+   * @return null if size <= 0 for buffer/chunk type
+   *        throw OutOfHybridMemory if out of memory
+   */
+  public V newDurableObjectRecord(long size) {
     V ret = null;
     DurableSinglyLinkedList<V> nv = null;
     try {
       nv = createDurableNode();
-      ret = createDurableObjectRecord();
+      ret = createDurableObjectRecord(size);
     } catch (OutOfHybridMemory e) {
       if (nv != null) {
         nv.destroy();
@@ -147,7 +181,7 @@ public class MneDurableOutputSession<V>
       initNextPool();
       try { /* retry */
         nv = createDurableNode();
-        ret = createDurableObjectRecord();
+        ret = createDurableObjectRecord(size);
       } catch (OutOfHybridMemory ee) {
         if (nv != null) {
           nv.destroy();
@@ -157,8 +191,12 @@ public class MneDurableOutputSession<V>
         }
       }
     }
-    if (ret != null) {
+    if (null != ret) {
       m_recordmap.put(ret, nv);
+    } else {
+      if (null != nv) {
+        nv.destroy();
+      }
     }
     return ret;
   }
@@ -169,28 +207,33 @@ public class MneDurableOutputSession<V>
     return ret;
   }
 
+  @Override
   public void post(V v) {
     DurableSinglyLinkedList<V> nv = null;
     if (null == v) {
       return;
     }
-    if (DurableType.DURABLE == getDurableTypes()[0]) {
+    switch (getDurableTypes()[0]) {
+    case DURABLE:
+    case BUFFER:
+    case CHUNK:
       if (m_recordmap.containsKey(v)) {
         nv = m_recordmap.remove(v);
       } else {
         throw new RuntimeException("The record hasn't been created by newDurableObjectRecord()");
       }
-    } else {
+      break;
+    default:
       try {
         nv = createDurableNode();
       } catch (OutOfHybridMemory e) {
         initNextPool();
         nv = createDurableNode();
       }
+      break;
     }
-    if (nv != null) {
-      nv.setItem(v, false);
-    }
+    assert null != nv;
+    nv.setItem(v, false);
     if (m_newpool) {
       m_act.setHandler(getSlotKeyId(), nv.getHandler());
       m_newpool = false;
@@ -213,6 +256,7 @@ public class MneDurableOutputSession<V>
     }
   }
 
+  @Override
   public void close() {
     destroyAllPendingRecords();
     m_act.close();

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneInputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneInputSession.java
new file mode 100644
index 0000000..cb0d42d
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneInputSession.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+
+public interface MneInputSession<V> extends Closeable {
+
+  void readConfig(String prefix);
+  void initialize(Path path);
+  Iterator<V> iterator();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneOutputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneOutputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneOutputSession.java
new file mode 100644
index 0000000..b55fdee
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneOutputSession.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import java.io.Closeable;
+
+public interface MneOutputSession<V> extends Closeable {
+
+  void readConfig(String prefix);
+  void initNextPool();
+  void post(V v);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
new file mode 100644
index 0000000..e29a8c8
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceBufferDataTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.mnemonic.DurableBuffer;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MneMapreduceBufferDataTest {
+
+  private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test"
+ File.separator + "tmp";
+  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator +
"buffer-data";
+  private static final String SERVICE_NAME = "pmalloc";
+  private static final long SLOT_KEY_ID = 5L;
+  private Path m_workdir;
+  private JobConf m_conf;
+  private FileSystem m_fs;
+  private Random m_rand;
+  private TaskAttemptID m_taid;
+  private TaskAttemptContext m_tacontext;
+  private long m_reccnt = 5000L;
+  private volatile long m_checksum;
+  private volatile long m_totalsize = 0L;
+  private List<String> m_partfns;
+
+  @BeforeClass
+  public void setUp() throws IOException {
+    m_workdir = new Path(
+        System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+    m_conf = new JobConf();
+    m_rand = Utils.createRandom();
+    m_partfns = new ArrayList<String>();
+
+    try {
+      m_fs = FileSystem.getLocal(m_conf).getRaw();
+      m_fs.delete(m_workdir, true);
+      m_fs.mkdirs(m_workdir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+
+    m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
+
+    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setBaseOutputName(m_conf, null, "buffer-data");
+
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {});
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setMemPoolSize(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.BUFFER});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {});
+  }
+
+  @AfterClass
+  public void tearDown() {
+
+  }
+
+  protected DurableBuffer<?> genupdDurableBuffer(
+      MneDurableOutputSession<DurableBuffer<?>> s, Checksum cs) {
+    DurableBuffer<?> ret = null;
+    int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+    ret = s.newDurableObjectRecord(sz);
+    if (null != ret) {
+      ret.get().clear();
+      byte[] rdbytes = RandomUtils.nextBytes(sz);
+      Assert.assertNotNull(rdbytes);
+      ret.get().put(rdbytes);
+      cs.update(rdbytes, 0, rdbytes.length);
+      m_totalsize += sz;
+    }
+    return ret;
+  }
+
+  @Test(enabled = true)
+  public void testWriteBufferData() throws Exception {
+    NullWritable nada = NullWritable.get();
+    MneDurableOutputSession<DurableBuffer<?>> sess = new MneDurableOutputSession<DurableBuffer<?>>(m_tacontext);
+    sess.readConfig(MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    sess.initNextPool();
+    MneDurableOutputValue<DurableBuffer<?>> mdvalue =
+        new MneDurableOutputValue<DurableBuffer<?>>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<DurableBuffer<?>>>
outputFormat =
+        new MneOutputFormat<MneDurableOutputValue<DurableBuffer<?>>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<DurableBuffer<?>>>
writer =
+        outputFormat.getRecordWriter(m_tacontext);
+    DurableBuffer<?> dbuf = null;
+    Checksum cs = new CRC32();
+    cs.reset();
+    for (int i = 0; i < m_reccnt; ++i) {
+      dbuf = genupdDurableBuffer(sess, cs);
+      Assert.assertNotNull(dbuf);
+      writer.write(nada, mdvalue.of(dbuf));
+    }
+    m_checksum = cs.getValue();
+    writer.close(m_tacontext);
+    sess.close();
+  }
+
+  @Test(enabled = true, dependsOnMethods = { "testWriteBufferData" })
+  public void testReadBufferData() throws Exception {
+    long reccnt = 0L;
+    long tsize = 0L;
+    byte[] buf;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf,
null))
+          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION))
{
+        m_partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(m_partfns); // keep the order for checksum
+    for (int idx = 0; idx < m_partfns.size(); ++idx) {
+      System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+      FileSplit split = new FileSplit(
+          new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+      InputFormat<NullWritable, MneDurableInputValue<DurableBuffer<?>>>
inputFormat =
+          new MneInputFormat<MneDurableInputValue<DurableBuffer<?>>, DurableBuffer<?>>();
+      RecordReader<NullWritable, MneDurableInputValue<DurableBuffer<?>>>
reader =
+          inputFormat.createRecordReader(split, m_tacontext);
+      MneDurableInputValue<DurableBuffer<?>> dbufval = null;
+      while (reader.nextKeyValue()) {
+        dbufval = reader.getCurrentValue();
+        assert dbufval.getValue().getSize() == dbufval.getValue().get().capacity();
+        dbufval.getValue().get().clear();
+        buf = new byte[dbufval.getValue().get().capacity()];
+        dbufval.getValue().get().get(buf);
+        cs.update(buf, 0, buf.length);
+        tsize += dbufval.getValue().getSize();
+        ++reccnt;
+      }
+      reader.close();
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of buffer is %d", m_checksum));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
new file mode 100644
index 0000000..1dfe7ad
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceChunkDataTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.mnemonic.DurableChunk;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
+import org.apache.mnemonic.hadoop.MneDurableOutputSession;
+import org.apache.mnemonic.hadoop.MneDurableOutputValue;
+import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
+import org.apache.mnemonic.hadoop.mapreduce.MneOutputFormat;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import sun.misc.Unsafe;
+
+@SuppressWarnings("restriction")
+public class MneMapreduceChunkDataTest {
+
+  private static final String DEFAULT_BASE_WORK_DIR = "target" + File.separator + "test"
+ File.separator + "tmp";
+  private static final String DEFAULT_WORK_DIR = DEFAULT_BASE_WORK_DIR + File.separator +
"chunk-data";
+  private static final String SERVICE_NAME = "pmalloc";
+  private static final long SLOT_KEY_ID = 5L;
+  private Path m_workdir;
+  private JobConf m_conf;
+  private FileSystem m_fs;
+  private Random m_rand;
+  private TaskAttemptID m_taid;
+  private TaskAttemptContext m_tacontext;
+  private long m_reccnt = 5000L;
+  private volatile long m_checksum;
+  private volatile long m_totalsize = 0L;
+  private List<String> m_partfns;
+  private Unsafe unsafe;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    m_workdir = new Path(
+        System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
+    m_conf = new JobConf();
+    m_rand = Utils.createRandom();
+    m_partfns = new ArrayList<String>();
+    unsafe = Utils.getUnsafe();
+
+    try {
+      m_fs = FileSystem.getLocal(m_conf).getRaw();
+      m_fs.delete(m_workdir, true);
+      m_fs.mkdirs(m_workdir);
+    } catch (IOException e) {
+      throw new IllegalStateException("bad fs init", e);
+    }
+
+    m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
+    m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
+
+    m_conf.set("mapreduce.output.fileoutputformat.outputdir", m_workdir.toString());
+    MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
+
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[] {DurableType.CHUNK});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[] {});
+    MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX,
SERVICE_NAME);
+    MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
+    MneConfigHelper.setMemPoolSize(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
+    MneConfigHelper.setDurableTypes(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[] {DurableType.CHUNK});
+    MneConfigHelper.setEntityFactoryProxies(m_conf,
+        MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[] {});
+  }
+
+  @AfterClass
+  public void tearDown() {
+
+  }
+
+  protected DurableChunk<?> genupdDurableChunk(
+      MneDurableOutputSession<DurableChunk<?>> s, Checksum cs) {
+    DurableChunk<?> ret = null;
+    int sz = m_rand.nextInt(1024 * 1024) + 1024 * 1024;
+    ret = s.newDurableObjectRecord(sz);
+    byte b;
+    if (null != ret) {
+      for (int i = 0; i < ret.getSize(); ++i) {
+        b = (byte) m_rand.nextInt(255);
+        unsafe.putByte(ret.get() + i, b);
+        cs.update(b);
+      }
+      m_totalsize += sz;
+    }
+    return ret;
+  }
+
+  @Test(enabled = true)
+  public void testWriteChunkData() throws Exception {
+    NullWritable nada = NullWritable.get();
+    MneDurableOutputSession<DurableChunk<?>> sess = new MneDurableOutputSession<DurableChunk<?>>(m_tacontext);
+    sess.readConfig(MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX);
+    sess.initNextPool();
+    MneDurableOutputValue<DurableChunk<?>> mdvalue =
+        new MneDurableOutputValue<DurableChunk<?>>(sess);
+    OutputFormat<NullWritable, MneDurableOutputValue<DurableChunk<?>>>
outputFormat =
+        new MneOutputFormat<MneDurableOutputValue<DurableChunk<?>>>();
+    RecordWriter<NullWritable, MneDurableOutputValue<DurableChunk<?>>>
writer =
+        outputFormat.getRecordWriter(m_tacontext);
+    DurableChunk<?> dchunk = null;
+    Checksum cs = new CRC32();
+    cs.reset();
+    for (int i = 0; i < m_reccnt; ++i) {
+      dchunk = genupdDurableChunk(sess, cs);
+      Assert.assertNotNull(dchunk);
+      writer.write(nada, mdvalue.of(dchunk));
+    }
+    m_checksum = cs.getValue();
+    writer.close(m_tacontext);
+    sess.close();
+  }
+
+  @Test(enabled = true, dependsOnMethods = { "testWriteChunkData" })
+  public void testReadChunkData() throws Exception {
+    long reccnt = 0L;
+    long tsize = 0L;
+    Checksum cs = new CRC32();
+    cs.reset();
+    File folder = new File(m_workdir.toString());
+    File[] listfiles = folder.listFiles();
+    for (int idx = 0; idx < listfiles.length; ++idx) {
+      if (listfiles[idx].isFile()
+          && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf,
null))
+          && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION))
{
+        m_partfns.add(listfiles[idx].getName());
+      }
+    }
+    Collections.sort(m_partfns); // keep the order for checksum
+    for (int idx = 0; idx < m_partfns.size(); ++idx) {
+      System.out.println(String.format("Verifying : %s", m_partfns.get(idx)));
+      FileSplit split = new FileSplit(
+          new Path(m_workdir, m_partfns.get(idx)), 0, 0L, new String[0]);
+      InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>>
inputFormat =
+          new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
+      RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>>
reader =
+          inputFormat.createRecordReader(split, m_tacontext);
+      MneDurableInputValue<DurableChunk<?>> dchkval = null;
+      while (reader.nextKeyValue()) {
+        dchkval = reader.getCurrentValue();
+        byte b;
+        for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
+          b = unsafe.getByte(dchkval.getValue().get() + j);
+          cs.update(b);
+        }
+        tsize += dchkval.getValue().getSize();
+        ++reccnt;
+      }
+      reader.close();
+    }
+    AssertJUnit.assertEquals(m_reccnt, reccnt);
+    AssertJUnit.assertEquals(m_totalsize, tsize);
+    AssertJUnit.assertEquals(m_checksum, cs.getValue());
+    System.out.println(String.format("The checksum of chunk is %d", m_checksum));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/be1ef9ff/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 32943f7..cce5f15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,6 +124,11 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-lang3</artifactId>
+        <version>3.4</version>
+      </dependency>
+      <dependency>
         <groupId>org.testng</groupId>
         <artifactId>testng</artifactId>
         <version>6.8.17</version>


Mime
View raw message