ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [22/25] ignite git commit: IGNITE-3411: Hadoop: Implemented Hadoop attributes.
Date Fri, 01 Jul 2016 13:00:10 GMT
IGNITE-3411: Hadoop: Implemented Hadoop attributes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b411606b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b411606b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b411606b

Branch: refs/heads/ignite-3154
Commit: b411606b292e16edbf16df7522652846696ac364
Parents: cc7ace9
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Jul 1 15:22:39 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Jul 1 15:22:39 2016 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopAttributes.java     | 168 +++++++++++++++++++
 .../processors/hadoop/HadoopProcessor.java      |  49 +++---
 2 files changed, 194 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b411606b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
new file mode 100644
index 0000000..23eaa18
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopAttributes.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * Hadoop attributes.
+ */
+public class HadoopAttributes implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Attribute name. */
+    public static final String NAME = IgniteNodeAttributes.ATTR_PREFIX + ".hadoop";
+
+    /** Map-reduce planner class name. */
+    private String plannerCls;
+
+    /** External executor flag. */
+    private boolean extExec;
+
+    /** Maximum parallel tasks. */
+    private int maxParallelTasks;
+
+    /** Maximum task queue size. */
+    private int maxTaskQueueSize;
+
+    /** Library names. */
+    @GridToStringExclude
+    private String[] libNames;
+
+    /** Number of cores. */
+    private int cores;
+
+    /**
+     * Get attributes for node (if any).
+     *
+     * @param node Node.
+     * @return Attributes or {@code null} if Hadoop Accelerator is not enabled for node.
+     */
+    @Nullable public static HadoopAttributes forNode(ClusterNode node) {
+        return node.attribute(NAME);
+    }
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public HadoopAttributes() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Configuration.
+     */
+    public HadoopAttributes(HadoopConfiguration cfg) {
+        assert cfg != null;
+        assert cfg.getMapReducePlanner() != null;
+
+        plannerCls = cfg.getMapReducePlanner().getClass().getName();
+
+        // TODO: IGNITE-404: Get from configuration when fixed.
+        extExec = false;
+
+        maxParallelTasks = cfg.getMaxParallelTasks();
+        maxTaskQueueSize = cfg.getMaxTaskQueueSize();
+        libNames = cfg.getNativeLibraryNames();
+
+        // Cores count already passed in other attributes, we add it here for convenience.
+        cores = Runtime.getRuntime().availableProcessors();
+    }
+
+    /**
+     * @return Map reduce planner class name.
+     */
+    public String plannerClassName() {
+        return plannerCls;
+    }
+
+    /**
+     * @return External execution flag.
+     */
+    public boolean externalExecution() {
+        return extExec;
+    }
+
+    /**
+     * @return Maximum parallel tasks.
+     */
+    public int maxParallelTasks() {
+        return maxParallelTasks;
+    }
+
+    /**
+     * @return Maximum task queue size.
+     */
+    public int maxTaskQueueSize() {
+        return maxTaskQueueSize;
+    }
+
+
+    /**
+     * @return Native library names.
+     */
+    public String[] nativeLibraryNames() {
+        return libNames;
+    }
+
+    /**
+     * @return Number of cores on machine.
+     */
+    public int cores() {
+        return cores;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(plannerCls);
+        out.writeBoolean(extExec);
+        out.writeInt(maxParallelTasks);
+        out.writeInt(maxTaskQueueSize);
+        out.writeObject(libNames);
+        out.writeInt(cores);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+        plannerCls = (String)in.readObject();
+        extExec = in.readBoolean();
+        maxParallelTasks = in.readInt();
+        maxTaskQueueSize = in.readInt();
+        libNames = (String[])in.readObject();
+        cores = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopAttributes.class, this, "libNames", Arrays.toString(libNames));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b411606b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
index 7fc7499..bb10565 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -17,10 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.configuration.HadoopConfiguration;
 import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
@@ -34,6 +30,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Hadoop processor.
  */
@@ -85,16 +86,24 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
             c.start(hctx);
 
         hadoop = new HadoopImpl(this);
+
+        ctx.addNodeAttribute(HadoopAttributes.NAME, new HadoopAttributes(cfg));
     }
 
     /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopProcessor.class, this);
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        if (hctx == null)
+            return;
+
+        for (HadoopComponent c : hctx.components())
+            c.onKernalStart();
     }
 
     /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
 
         if (hctx == null)
             return;
@@ -104,24 +113,13 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
         for (ListIterator<HadoopComponent> it = components.listIterator(components.size());
it.hasPrevious();) {
             HadoopComponent c = it.previous();
 
-            c.stop(cancel);
+            c.onKernalStop(cancel);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (hctx == null)
-            return;
-
-        for (HadoopComponent c : hctx.components())
-            c.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
 
         if (hctx == null)
             return;
@@ -131,7 +129,7 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
         for (ListIterator<HadoopComponent> it = components.listIterator(components.size());
it.hasPrevious();) {
             HadoopComponent c = it.previous();
 
-            c.onKernalStop(cancel);
+            c.stop(cancel);
         }
     }
 
@@ -217,4 +215,9 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
         if (cfg.getMapReducePlanner() == null)
             cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessor.class, this);
+    }
 }
\ No newline at end of file


Mime
View raw message