mnemonic-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject incubator-mnemonic git commit: MNEMONIC-194: Extract core functions from MneMapreduceRecordReader
Date Thu, 16 Feb 2017 01:41:23 GMT
Repository: incubator-mnemonic
Updated Branches:
  refs/heads/master 408625f56 -> 66fe6c032


MNEMONIC-194: Extract core functions from MneMapreduceRecordReader


Project: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/commit/66fe6c03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/tree/66fe6c03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/diff/66fe6c03

Branch: refs/heads/master
Commit: 66fe6c03267edb7f2a2bb2a59f5b0df21eb0862f
Parents: 408625f
Author: Wang, Gang(Gary) <gang1.wang@intel.com>
Authored: Wed Feb 15 17:20:53 2017 -0800
Committer: Wang, Gang(Gary) <gang1.wang@intel.com>
Committed: Wed Feb 15 17:38:34 2017 -0800

----------------------------------------------------------------------
 .../mnemonic/hadoop/MneDurableInputSession.java | 141 +++++++++++++++++++
 .../mnemonic/hadoop/MneDurableInputValue.java   |  48 +++++++
 .../hadoop/mapreduce/MneInputFormat.java        |   9 +-
 .../mapreduce/MneMapreduceRecordReader.java     |  71 +++-------
 .../mnemonic/mapreduce/MneMapreduceIOTest.java  |  16 ++-
 5 files changed, 220 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/66fe6c03/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
new file mode 100644
index 0000000..71833ba
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputSession.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.mnemonic.ConfigurationException;
+import org.apache.mnemonic.DurableType;
+import org.apache.mnemonic.EntityFactoryProxy;
+import org.apache.mnemonic.NonVolatileMemAllocator;
+import org.apache.mnemonic.Utils;
+import org.apache.mnemonic.collections.DurableSinglyLinkedList;
+import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
+
+public class MneDurableInputSession<V> implements MneDurableComputable<NonVolatileMemAllocator>
{
+
+  private TaskAttemptContext taskAttemptContext;
+  private String serviceName;
+  private DurableType[] durableTypes;
+  private EntityFactoryProxy[] entityFactoryProxies;
+  private long slotKeyId;
+
+  protected long m_handler;
+  protected NonVolatileMemAllocator m_act;
+  protected Iterator<V> m_iter;
+
+  public MneDurableInputSession(TaskAttemptContext taskAttemptContext) {
+    setTaskAttemptContext(taskAttemptContext);
+  }
+
+  public void validateConfig() {
+    if (getDurableTypes().length < 1) {
+      throw new ConfigurationException("The durable type of record parameters does not exist");
+    } else {
+      if (DurableType.DURABLE == getDurableTypes()[0]
+          && getEntityFactoryProxies().length < 1) { /* T.B.D. BUFFER & CHUNK
*/
+        throw new ConfigurationException("The durable entity proxy of record parameters does
not exist");
+      }
+    }
+  }
+
+  public void readConfig(String prefix) {
+    if (getTaskAttemptContext() == null) {
+      throw new ConfigurationException("taskAttemptContext has not yet been set");
+    }
+    Configuration conf = getTaskAttemptContext().getConfiguration();
+    setServiceName(MneConfigHelper.getMemServiceName(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
+    setDurableTypes(MneConfigHelper.getDurableTypes(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
+    setEntityFactoryProxies(Utils.instantiateEntityFactoryProxies(
+        MneConfigHelper.getEntityFactoryProxies(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX)));
+    setSlotKeyId(MneConfigHelper.getSlotKeyId(conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
+    validateConfig();
+  }
+
+  public void initialize(Path path) {
+    DurableSinglyLinkedList<V> dsllist;
+    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(getServiceName()),
1024000L,
+        path.toString(), true);
+    m_handler = m_act.getHandler(getSlotKeyId());
+    dsllist = DurableSinglyLinkedListFactory.restore(m_act, getEntityFactoryProxies(), getDurableTypes(),
m_handler,
+        false);
+    m_iter = dsllist.iterator();
+  }
+
+  public Iterator<V> iterator() {
+    return m_iter;
+  }
+
+  public void close() {
+    m_act.close();
+  }
+
+  public TaskAttemptContext getTaskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public void setTaskAttemptContext(TaskAttemptContext taskAttemptContext) {
+    this.taskAttemptContext = taskAttemptContext;
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  public DurableType[] getDurableTypes() {
+    return durableTypes;
+  }
+
+  public void setDurableTypes(DurableType[] durableTypes) {
+    this.durableTypes = durableTypes;
+  }
+
+  public EntityFactoryProxy[] getEntityFactoryProxies() {
+    return entityFactoryProxies;
+  }
+
+  public void setEntityFactoryProxies(EntityFactoryProxy[] entityFactoryProxies) {
+    this.entityFactoryProxies = entityFactoryProxies;
+  }
+
+  public long getSlotKeyId() {
+    return slotKeyId;
+  }
+
+  public void setSlotKeyId(long slotKeyId) {
+    this.slotKeyId = slotKeyId;
+  }
+
+  @Override
+  public NonVolatileMemAllocator getAllocator() {
+    return m_act;
+  }
+
+  @Override
+  public long getHandler() {
+    return m_handler;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/66fe6c03/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputValue.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputValue.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputValue.java
new file mode 100644
index 0000000..f1b11a7
--- /dev/null
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/MneDurableInputValue.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mnemonic.hadoop;
+
+public class MneDurableInputValue<V> {
+
+  protected MneDurableInputSession<V> m_session;
+  protected V m_value;
+
+  public MneDurableInputValue(MneDurableInputSession<V> sess) {
+    m_session = sess;
+  }
+
+  public MneDurableInputValue(MneDurableInputSession<V> sess, V value) {
+    m_session = sess;
+    m_value = value;
+  }
+
+  public MneDurableInputSession<V> getSession() {
+    return m_session;
+  }
+
+  public MneDurableInputValue<V> of(V value) {
+    m_value = value;
+    return this;
+  }
+
+  public V getValue() {
+    return m_value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/66fe6c03/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneInputFormat.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneInputFormat.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneInputFormat.java
index b03adcd..ebfa44a 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneInputFormat.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
@@ -32,8 +33,8 @@ import org.apache.hadoop.io.NullWritable;
 /**
  * A Mnemonic input format that satisfies the org.apache.hadoop.mapreduce API.
  */
-public class MneInputFormat<V>
-    extends FileInputFormat<NullWritable, V> {
+public class MneInputFormat<MV extends MneDurableInputValue<V>, V>
+    extends FileInputFormat<NullWritable, MV> {
 
   @Override
   protected boolean isSplitable(JobContext context, Path filename) {
@@ -41,11 +42,11 @@ public class MneInputFormat<V>
   }
 
   @Override
-  public RecordReader<NullWritable, V>
+  public RecordReader<NullWritable, MV>
       createRecordReader(InputSplit inputSplit,
                          TaskAttemptContext taskAttemptContext
                          ) throws IOException, InterruptedException {
-    MneMapreduceRecordReader<V> reader = new MneMapreduceRecordReader<V>();
+    MneMapreduceRecordReader<MV, V> reader = new MneMapreduceRecordReader<MV, V>();
     reader.initialize(inputSplit, taskAttemptContext);
     return reader;
   }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/66fe6c03/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
index f253cbc..8a2c599 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/main/java/org/apache/mnemonic/hadoop/mapreduce/MneMapreduceRecordReader.java
@@ -20,66 +20,38 @@ package org.apache.mnemonic.hadoop.mapreduce;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.mnemonic.DurableType;
-import org.apache.mnemonic.EntityFactoryProxy;
-import org.apache.mnemonic.NonVolatileMemAllocator;
-import org.apache.mnemonic.Utils;
-import org.apache.mnemonic.collections.DurableSinglyLinkedList;
-import org.apache.mnemonic.collections.DurableSinglyLinkedListFactory;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
-import org.apache.mnemonic.hadoop.MneDurableComputable;
+import org.apache.mnemonic.hadoop.MneDurableInputSession;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
 
 /**
  * This record reader implements the org.apache.hadoop.mapreduce API.
- * @param <V> the type of the data item
+ *
+ * @param <V>
+ *          the type of the data item
  */
-public class MneMapreduceRecordReader<V>
-    extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, V>
-    implements MneDurableComputable<NonVolatileMemAllocator> {
+public class MneMapreduceRecordReader<MV extends MneDurableInputValue<V>, V>
+    extends org.apache.hadoop.mapreduce.RecordReader<NullWritable, MV> {
 
-  protected Configuration m_conf;
-  protected TaskAttemptContext m_context;
-  protected NonVolatileMemAllocator m_act;
   protected Iterator<V> m_iter;
-  protected long m_slotkeyid;
-  protected long m_handler = 0L;
-  protected DurableType[] m_gtypes;
-  protected EntityFactoryProxy[] m_efproxies;
-  protected String m_msvrname;
-
-  public MneMapreduceRecordReader() {
-  }
+  protected MneDurableInputSession<V> m_session;
 
   @Override
   public void close() throws IOException {
-    m_act.close();
+    m_session.close();
   }
 
   @Override
-  public void initialize(InputSplit inputSplit,
-                         TaskAttemptContext context) {
+  public void initialize(InputSplit inputSplit, TaskAttemptContext context) {
     FileSplit split = (FileSplit) inputSplit;
-    m_context = context;
-    m_conf = m_context.getConfiguration();
-    m_msvrname = MneConfigHelper.getMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-    m_gtypes = MneConfigHelper.getDurableTypes(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-    m_efproxies = Utils.instantiateEntityFactoryProxies(
-        MneConfigHelper.getEntityFactoryProxies(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX));
-    m_slotkeyid = MneConfigHelper.getSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
-    
-    DurableSinglyLinkedList<V> dsllist;
-
-    m_act = new NonVolatileMemAllocator(Utils.getNonVolatileMemoryAllocatorService(m_msvrname),
1024000L,
-        split.getPath().toString(), true);
-    m_handler = m_act.getHandler(m_slotkeyid);
-    dsllist = DurableSinglyLinkedListFactory.restore(m_act, m_efproxies, 
-        m_gtypes, m_handler, false);
-    m_iter = dsllist.iterator();
+    m_session = new MneDurableInputSession<V>(context);
+    m_session.readConfig(MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX);
+    m_session.initialize(split.getPath());
+    m_iter = m_session.iterator();
   }
 
   @Override
@@ -92,9 +64,10 @@ public class MneMapreduceRecordReader<V>
     return NullWritable.get();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public V getCurrentValue() throws IOException, InterruptedException {
-    return m_iter.next();
+  public MV getCurrentValue() throws IOException, InterruptedException {
+    return (MV) new MneDurableInputValue<V>(m_session, m_iter.next());
   }
 
   @Override
@@ -102,14 +75,4 @@ public class MneMapreduceRecordReader<V>
     return 0.5f; /* TBD */
   }
 
-  @Override
-  public NonVolatileMemAllocator getAllocator() {
-    return m_act;
-  }
-
-  @Override
-  public long getHandler() {
-    return m_handler;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-mnemonic/blob/66fe6c03/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
----------------------------------------------------------------------
diff --git a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
index 41ac8f5..df18367 100644
--- a/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
+++ b/mnemonic-hadoop/mnemonic-hadoop-mapreduce/src/test/java/org/apache/mnemonic/mapreduce/MneMapreduceIOTest.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.mnemonic.DurableType;
 import org.apache.mnemonic.Utils;
 import org.apache.mnemonic.hadoop.MneConfigHelper;
+import org.apache.mnemonic.hadoop.MneDurableInputValue;
 import org.apache.mnemonic.hadoop.MneDurableOutputSession;
 import org.apache.mnemonic.hadoop.MneDurableOutputValue;
 import org.apache.mnemonic.hadoop.mapreduce.MneInputFormat;
@@ -137,14 +138,15 @@ public class MneMapreduceIOTest {
         System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
         FileSplit split = new FileSplit(
             new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
-        InputFormat<NullWritable, Person<Long>> inputFormat = new MneInputFormat<Person<Long>>();
-        RecordReader<NullWritable, Person<Long>> reader = inputFormat.createRecordReader(
-            split, m_tacontext);
-        Person<Long> person = null;
+        InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat
=
+            new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
+        RecordReader<NullWritable, MneDurableInputValue<Person<Long>>>
reader =
+            inputFormat.createRecordReader(split, m_tacontext);
+        MneDurableInputValue<Person<Long>> personval = null;
         while (reader.nextKeyValue()) {
-          person = reader.getCurrentValue();
-          AssertJUnit.assertTrue(person.getAge() < 51);
-          sumage += person.getAge();
+          personval = reader.getCurrentValue();
+          AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
+          sumage += personval.getValue().getAge();
           ++reccnt;
         }
         reader.close();


Mime
View raw message