hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chl...@apache.org
Subject hama git commit: Add RPC service.
Date Sat, 16 Jun 2018 18:25:51 GMT
Repository: hama
Updated Branches:
  refs/heads/componentization 98a052b86 -> 7dfe85e3f


Add RPC service.


Project: http://git-wip-us.apache.org/repos/asf/hama/repo
Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/7dfe85e3
Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/7dfe85e3
Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/7dfe85e3

Branch: refs/heads/componentization
Commit: 7dfe85e3fdaaaa19c429c03393f56204623a0b5b
Parents: 98a052b
Author: ChiaHung Lin <chl501@apache.org>
Authored: Sat Jun 16 20:20:53 2018 +0200
Committer: ChiaHung Lin <chl501@apache.org>
Committed: Sat Jun 16 20:20:53 2018 +0200

----------------------------------------------------------------------
 .../scala/org/apache/hama/conf/Setting.scala    |  12 ++
 .../scala/org/apache/hama/logging/Logging.scala |   2 +-
 .../org/apache/hama/master/Communicator.scala   | 150 ++++++++++++++++++-
 .../apache/hama/master/IdentifierProvider.scala |  33 ++++
 .../scala/org/apache/hama/master/Master.scala   |  26 ++++
 .../org/apache/hama/master/Scheduler.scala      |  15 +-
 6 files changed, 231 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/conf/Setting.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/conf/Setting.scala b/core/src/main/scala/org/apache/hama/conf/Setting.scala
index 7145dd1..a415521 100644
--- a/core/src/main/scala/org/apache/hama/conf/Setting.scala
+++ b/core/src/main/scala/org/apache/hama/conf/Setting.scala
@@ -21,11 +21,13 @@ import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 import com.typesafe.config.ConfigValue
 import com.typesafe.config.ConfigValueFactory
+import org.apache.hama.HamaConfiguration
 import org.apache.hama.logging.Logging
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
 import scala.reflect.ClassTag
+import scala.collection.JavaConverters._
 
 object Setting {
   
@@ -39,6 +41,8 @@ trait Setting extends Logging {
   def get[T: ClassTag](key: String): Option[T]
 
   def set[T](key: String, default: T): Setting
+  
+  def hama(): HamaConfiguration
 
 }
 
@@ -55,4 +59,12 @@ protected[conf] class DefaultSetting(config: Config) extends Setting {
     config.withValue(key, ConfigValueFactory.fromAnyRef(key))
   )
   
+  override def hama(): HamaConfiguration = 
+    config.entrySet.asScala.foldLeft(new HamaConfiguration) { case (c, e) =>
+      val key = e.getKey
+      val value = e.getValue
+      c.set(key, value.toString)
+      c
+    }
+  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/logging/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/logging/Logging.scala b/core/src/main/scala/org/apache/hama/logging/Logging.scala
index b02341a..c971d2a 100644
--- a/core/src/main/scala/org/apache/hama/logging/Logging.scala
+++ b/core/src/main/scala/org/apache/hama/logging/Logging.scala
@@ -20,7 +20,7 @@ package org.apache.hama.logging
 import org.slf4j.LoggerFactory
 import java.io.Serializable
 
-trait Logging extends Serializable {
+trait Logging {
 
   val log = LoggerFactory.getLogger(getClass)
   

http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Communicator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/master/Communicator.scala b/core/src/main/scala/org/apache/hama/master/Communicator.scala
index 4eaae31..bf28c5c 100644
--- a/core/src/main/scala/org/apache/hama/master/Communicator.scala
+++ b/core/src/main/scala/org/apache/hama/master/Communicator.scala
@@ -17,14 +17,41 @@
  */
 package org.apache.hama.master
 
-import org.apache.hama.util.Utils._
+import java.io.IOException
+import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.Callable
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.Executors
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path
+import org.apache.hama.bsp.Directive
+import org.apache.hama.bsp.DirectiveHandler
+import org.apache.hama.bsp.GroomServerStatus
+import org.apache.hama.bsp.GroomStatusListener
+import org.apache.hama.bsp.ReportGroomStatusDirective
+import org.apache.hama.conf.Setting
+import org.apache.hama.ipc.GroomProtocol
+import org.apache.hama.ipc.HamaRPCProtocolVersion
+import org.apache.hama.ipc.JobSubmissionProtocol
+import org.apache.hama.ipc.MasterProtocol
 import org.apache.hama.ipc.RPC
+import org.apache.hama.logging.Logging
+import org.apache.hama.util.Utils._
+import scala.collection.JavaConverters._
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
 
+// TODO: move to net package
 case class Address(host: String, port: Int) {
 
   require(!isEmpty(host), s"Host is not provided!")  
 
-  require(isValidPort(port), s"Invalid port range!")  
+  require(isValidPort(port), s"Invalid port range: $port!")  
+  
+  def inet: InetSocketAddress = new InetSocketAddress(host, port)
 
 }
 
@@ -32,16 +59,129 @@ trait Communicator {
   
   def address(): Address
   
+  def start(): Boolean
+  
+  def stop(): Boolean
+  
 }
 
 object Communicator {
   
-  def create(host: String, port: Int): Communicator = 
-    new DefaultCommunicator(Address(host, port))  
+  @deprecated
+  class Instructor extends Callable[Boolean] with Logging {
+    protected[master] val queue = new LinkedBlockingQueue[Directive]
+    protected[master] val handlers = new ConcurrentHashMap[Directive, DirectiveHandler]
+
+    def bind(instruction: Directive, handler: DirectiveHandler) = 
+      handlers.putIfAbsent(instruction, handler)
+
+    def put(directive: Directive) = Try(queue.put(directive)) match {
+      case Success(_) => true
+      case Failure(ex) => false
+    }
+
+    @throws(classOf[Exception])
+    override def call(): Boolean = {
+      while(true) { Try {  
+        val directive = queue.take
+        handlers.get(directive.getClass).handle(directive)
+      } match {
+        case Success(_) => 
+        case Failure(ex) => ex match {
+          case _: InterruptedException => Thread.currentThread().interrupt
+          case _: Exception => log.error(s"Failing executing command because $ex")
+          
+        }
+          
+      }}
+      true
+    }
+    
+  }
+
+  // TODO: 
+  def create(host: String, port: Int)(implicit setting: Setting, fs: FileSystem): Communicator
= 
+    new RPCService(Address(host, port), setting, fs)  
   
 }
-protected[master] class DefaultCommunicator(addr: Address) extends Communicator {
+@deprecated
+protected[master] class RPCService(addr: Address, setting: Setting, fs: FileSystem) 
+    extends Communicator with MasterProtocol with Logging {
+  
+  import Communicator._
+  
+  require(null != setting, "Setting is not provided!")
+  require(null != fs, "FileSystem is not provided!")
+  
+  val executor = Executors.newSingleThreadExecutor
+  
+  protected[master] val grooms = new ConcurrentHashMap[GroomServerStatus, GroomProtocol]
 
+  protected[master] val listeners = new CopyOnWriteArrayList[GroomStatusListener]
+  protected[master] lazy val rpc = RPC.getServer(
+    this, address.host, address.port, setting.hama
+  )
+  protected[master] lazy val instructor = { 
+    val ins = new Instructor      
+    /* TODO: 
+    instructor.bind(
+      classOf[ReportGroomStatusDirective].asInstanceOf[Directive],
+      /new ReportGroomStatusHandler()
+    )
+    */
+    ins
+  }
 
   override def address(): Address = addr
+    
+  def add(listener: GroomStatusListener) = listeners.add(listener)
+
+  def remove(listener: GroomStatusListener) = listeners.remove(listener)
+
+  override def register(status: GroomServerStatus): Boolean = {
+    require(null != status, "GroomServerStatus is missing!")
+    Try(RPC.waitForProxy(
+      classOf[GroomProtocol], 
+      HamaRPCProtocolVersion.versionID, 
+      address.inet,
+      setting.hama
+    ).asInstanceOf[GroomProtocol]) match {
+      case Success(groom) => 
+        grooms.putIfAbsent(status, groom)
+        listeners.asScala.foreach(_.groomServerRegistered(status))
+        true 
+      case Failure(ex) => 
+        log.error(s"Fail registering GroomServer because $ex")
+        false
+    }
+  }
+
+  @throws(classOf[IOException])
+  override def report(directive: Directive): Boolean = 
+    instructor.put(directive)
+  
+  override def getSystemDir(): String = {
+    val sysDir = new Path(setting.hama.get(
+      "bsp.system.dir", "/tmp/hadoop/bsp/system"
+    ))
+    fs.makeQualified(sysDir).toString()
+  }
+  
+  @throws(classOf[IOException])
+  override def getProtocolVersion(protocol: String, clientVersion: Long) = 
+    if (protocol.equals(classOf[MasterProtocol].getName)) 
+      HamaRPCProtocolVersion.versionID
+    else if (protocol.equals(classOf[JobSubmissionProtocol].getName)) 
+       HamaRPCProtocolVersion.versionID
+    else throw new IOException("Unknown protocol to BSPMaster: " + protocol)
+  
+  override def start(): Boolean = {
+    executor.submit(instructor) 
+    true
+  }
+  
+  override def stop(): Boolean = {
+    executor.shutdownNow
+    true
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala b/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala
new file mode 100644
index 0000000..3c9a7e2
--- /dev/null
+++ b/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.master
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Create unique identifier per BSP master.
+ * TODO: replace with more unique value. 
+ */
+object IdentifierProvider {
+  
+  protected[master] val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+  
+  def newIdentifier(): String = formatter.format(new Date())
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/master/Master.scala b/core/src/main/scala/org/apache/hama/master/Master.scala
new file mode 100644
index 0000000..a896fa2
--- /dev/null
+++ b/core/src/main/scala/org/apache/hama/master/Master.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.master
+
+object Master {
+  
+}
+protected[master] class Master {
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Scheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/hama/master/Scheduler.scala b/core/src/main/scala/org/apache/hama/master/Scheduler.scala
index 8d9febf..d637978 100644
--- a/core/src/main/scala/org/apache/hama/master/Scheduler.scala
+++ b/core/src/main/scala/org/apache/hama/master/Scheduler.scala
@@ -17,7 +17,13 @@
  */
 package org.apache.hama.master
 
-trait Scheduler 
+trait Scheduler {
+  
+  def start()
+  
+  def stop()
+  
+}
 
 object Scheduler {
   
@@ -27,4 +33,11 @@ object Scheduler {
 
 protected[master] class DefaultScheduler extends Scheduler {
   
+  override def start() {
+    
+  }
+  
+  override def stop() {
+    
+  }
 }
\ No newline at end of file


Mime
View raw message