From commits-return-3625-archive-asf-public=cust-asf.ponee.io@hama.apache.org Sat Jun 16 20:25:53 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E4A36180625 for ; Sat, 16 Jun 2018 20:25:52 +0200 (CEST) Received: (qmail 75201 invoked by uid 500); 16 Jun 2018 18:25:52 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 75190 invoked by uid 99); 16 Jun 2018 18:25:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Jun 2018 18:25:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DF561E0C64; Sat, 16 Jun 2018 18:25:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chl501@apache.org To: commits@hama.apache.org Message-Id: <15382ed30fc4420d8af9d8a877fbffd2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hama git commit: Add RPC service. Date: Sat, 16 Jun 2018 18:25:51 +0000 (UTC) Repository: hama Updated Branches: refs/heads/componentization 98a052b86 -> 7dfe85e3f Add RPC service. Project: http://git-wip-us.apache.org/repos/asf/hama/repo Commit: http://git-wip-us.apache.org/repos/asf/hama/commit/7dfe85e3 Tree: http://git-wip-us.apache.org/repos/asf/hama/tree/7dfe85e3 Diff: http://git-wip-us.apache.org/repos/asf/hama/diff/7dfe85e3 Branch: refs/heads/componentization Commit: 7dfe85e3fdaaaa19c429c03393f56204623a0b5b Parents: 98a052b Author: ChiaHung Lin Authored: Sat Jun 16 20:20:53 2018 +0200 Committer: ChiaHung Lin Committed: Sat Jun 16 20:20:53 2018 +0200 ---------------------------------------------------------------------- .../scala/org/apache/hama/conf/Setting.scala | 12 ++ .../scala/org/apache/hama/logging/Logging.scala | 2 +- .../org/apache/hama/master/Communicator.scala | 150 ++++++++++++++++++- .../apache/hama/master/IdentifierProvider.scala | 33 ++++ .../scala/org/apache/hama/master/Master.scala | 26 ++++ .../org/apache/hama/master/Scheduler.scala | 15 +- 6 files changed, 231 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/conf/Setting.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/conf/Setting.scala b/core/src/main/scala/org/apache/hama/conf/Setting.scala index 7145dd1..a415521 100644 --- a/core/src/main/scala/org/apache/hama/conf/Setting.scala +++ b/core/src/main/scala/org/apache/hama/conf/Setting.scala @@ -21,11 +21,13 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValue import com.typesafe.config.ConfigValueFactory +import org.apache.hama.HamaConfiguration import org.apache.hama.logging.Logging import scala.util.Failure import scala.util.Success import scala.util.Try import scala.reflect.ClassTag +import scala.collection.JavaConverters._ object Setting { @@ -39,6 +41,8 @@ trait Setting extends Logging { def get[T: ClassTag](key: String): Option[T] def set[T](key: String, default: T): Setting + + def hama(): HamaConfiguration } @@ -55,4 +59,12 @@ protected[conf] class DefaultSetting(config: Config) extends Setting { config.withValue(key, ConfigValueFactory.fromAnyRef(key)) ) + override def hama(): HamaConfiguration = + config.entrySet.asScala.foldLeft(new HamaConfiguration) { case (c, e) => + val key = e.getKey + val value = e.getValue + c.set(key, value.toString) + c + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/logging/Logging.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/logging/Logging.scala b/core/src/main/scala/org/apache/hama/logging/Logging.scala index b02341a..c971d2a 100644 --- a/core/src/main/scala/org/apache/hama/logging/Logging.scala +++ b/core/src/main/scala/org/apache/hama/logging/Logging.scala @@ -20,7 +20,7 @@ package org.apache.hama.logging import org.slf4j.LoggerFactory import java.io.Serializable -trait Logging extends Serializable { +trait Logging { val log = LoggerFactory.getLogger(getClass) http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Communicator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/master/Communicator.scala b/core/src/main/scala/org/apache/hama/master/Communicator.scala index 4eaae31..bf28c5c 100644 --- a/core/src/main/scala/org/apache/hama/master/Communicator.scala +++ b/core/src/main/scala/org/apache/hama/master/Communicator.scala @@ -17,14 +17,41 @@ */ package org.apache.hama.master -import org.apache.hama.util.Utils._ +import java.io.IOException +import java.net.InetSocketAddress +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Callable +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.Executors +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path +import org.apache.hama.bsp.Directive +import org.apache.hama.bsp.DirectiveHandler +import org.apache.hama.bsp.GroomServerStatus +import org.apache.hama.bsp.GroomStatusListener +import org.apache.hama.bsp.ReportGroomStatusDirective +import org.apache.hama.conf.Setting +import org.apache.hama.ipc.GroomProtocol +import org.apache.hama.ipc.HamaRPCProtocolVersion +import org.apache.hama.ipc.JobSubmissionProtocol +import org.apache.hama.ipc.MasterProtocol import org.apache.hama.ipc.RPC +import org.apache.hama.logging.Logging +import org.apache.hama.util.Utils._ +import scala.collection.JavaConverters._ +import scala.util.Failure +import scala.util.Success +import scala.util.Try +// TODO: move to net package case class Address(host: String, port: Int) { require(!isEmpty(host), s"Host is not provided!") - require(isValidPort(port), s"Invalid port range!") + require(isValidPort(port), s"Invalid port range: $port!") + + def inet: InetSocketAddress = new InetSocketAddress(host, port) } @@ -32,16 +59,129 @@ trait Communicator { def address(): Address + def start(): Boolean + + def stop(): Boolean + } object Communicator { - def create(host: String, port: Int): Communicator = - new DefaultCommunicator(Address(host, port)) + @deprecated + class Instructor extends Callable[Boolean] with Logging { + protected[master] val queue = new LinkedBlockingQueue[Directive] + protected[master] val handlers = new ConcurrentHashMap[Directive, DirectiveHandler] + + def bind(instruction: Directive, handler: DirectiveHandler) = + handlers.putIfAbsent(instruction, handler) + + def put(directive: Directive) = Try(queue.put(directive)) match { + case Success(_) => true + case Failure(ex) => false + } + + @throws(classOf[Exception]) + override def call(): Boolean = { + while(true) { Try { + val directive = queue.take + handlers.get(directive.getClass).handle(directive) + } match { + case Success(_) => + case Failure(ex) => ex match { + case _: InterruptedException => Thread.currentThread().interrupt + case _: Exception => log.error(s"Failing executing command because $ex") + + } + + }} + true + } + + } + + // TODO: + def create(host: String, port: Int)(implicit setting: Setting, fs: FileSystem): Communicator = + new RPCService(Address(host, port), setting, fs) } -protected[master] class DefaultCommunicator(addr: Address) extends Communicator { +@deprecated +protected[master] class RPCService(addr: Address, setting: Setting, fs: FileSystem) + extends Communicator with MasterProtocol with Logging { + + import Communicator._ + + require(null != setting, "Setting is not provided!") + require(null != fs, "FileSystem is not provided!") + + val executor = Executors.newSingleThreadExecutor + + protected[master] val grooms = new ConcurrentHashMap[GroomServerStatus, GroomProtocol] + protected[master] val listeners = new CopyOnWriteArrayList[GroomStatusListener] + protected[master] lazy val rpc = RPC.getServer( + this, address.host, address.port, setting.hama + ) + protected[master] lazy val instructor = { + val ins = new Instructor + /* TODO: + instructor.bind( + classOf[ReportGroomStatusDirective].asInstanceOf[Directive], + /new ReportGroomStatusHandler() + ) + */ + ins + } override def address(): Address = addr + + def add(listener: GroomStatusListener) = listeners.add(listener) + + def remove(listener: GroomStatusListener) = listeners.remove(listener) + + override def register(status: GroomServerStatus): Boolean = { + require(null != status, "GroomServerStatus is missing!") + Try(RPC.waitForProxy( + classOf[GroomProtocol], + HamaRPCProtocolVersion.versionID, + address.inet, + setting.hama + ).asInstanceOf[GroomProtocol]) match { + case Success(groom) => + grooms.putIfAbsent(status, groom) + listeners.asScala.foreach(_.groomServerRegistered(status)) + true + case Failure(ex) => + log.error(s"Fail registering GroomServer because $ex") + false + } + } + + @throws(classOf[IOException]) + override def report(directive: Directive): Boolean = + instructor.put(directive) + + override def getSystemDir(): String = { + val sysDir = new Path(setting.hama.get( + "bsp.system.dir", "/tmp/hadoop/bsp/system" + )) + fs.makeQualified(sysDir).toString() + } + + @throws(classOf[IOException]) + override def getProtocolVersion(protocol: String, clientVersion: Long) = + if (protocol.equals(classOf[MasterProtocol].getName)) + HamaRPCProtocolVersion.versionID + else if (protocol.equals(classOf[JobSubmissionProtocol].getName)) + HamaRPCProtocolVersion.versionID + else throw new IOException("Unknown protocol to BSPMaster: " + protocol) + + override def start(): Boolean = { + executor.submit(instructor) + true + } + + override def stop(): Boolean = { + executor.shutdownNow + true + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala b/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala new file mode 100644 index 0000000..3c9a7e2 --- /dev/null +++ b/core/src/main/scala/org/apache/hama/master/IdentifierProvider.scala @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.master + +import java.text.SimpleDateFormat +import java.util.Date + +/** + * Create unique identifier per BSP master. + * TODO: replace with more unique value. + */ +object IdentifierProvider { + + protected[master] val formatter = new SimpleDateFormat("yyyyMMddHHmm") + + def newIdentifier(): String = formatter.format(new Date()) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/master/Master.scala b/core/src/main/scala/org/apache/hama/master/Master.scala new file mode 100644 index 0000000..a896fa2 --- /dev/null +++ b/core/src/main/scala/org/apache/hama/master/Master.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.master + +object Master { + +} +protected[master] class Master { + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hama/blob/7dfe85e3/core/src/main/scala/org/apache/hama/master/Scheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/hama/master/Scheduler.scala b/core/src/main/scala/org/apache/hama/master/Scheduler.scala index 8d9febf..d637978 100644 --- a/core/src/main/scala/org/apache/hama/master/Scheduler.scala +++ b/core/src/main/scala/org/apache/hama/master/Scheduler.scala @@ -17,7 +17,13 @@ */ package org.apache.hama.master -trait Scheduler +trait Scheduler { + + def start() + + def stop() + +} object Scheduler { @@ -27,4 +33,11 @@ object Scheduler { protected[master] class DefaultScheduler extends Scheduler { + override def start() { + + } + + override def stop() { + + } } \ No newline at end of file