From commits-return-479-archive-asf-public=cust-asf.ponee.io@ratis.incubator.apache.org Wed Dec 5 04:03:20 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A7EFF180652 for ; Wed, 5 Dec 2018 04:03:18 +0100 (CET) Received: (qmail 11976 invoked by uid 500); 5 Dec 2018 03:03:17 -0000 Mailing-List: contact commits-help@ratis.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ratis.incubator.apache.org Delivered-To: mailing list commits@ratis.incubator.apache.org Received: (qmail 11959 invoked by uid 99); 5 Dec 2018 03:03:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2018 03:03:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 11824CB614 for ; Wed, 5 Dec 2018 03:03:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.701 X-Spam-Level: X-Spam-Status: No, score=-11.701 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id o4U_MMP1kM0Q for ; Wed, 5 Dec 2018 03:03:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 22D1B60F89 for ; Wed, 5 Dec 2018 03:03:10 +0000 (UTC) Received: (qmail 11533 invoked by uid 99); 5 Dec 2018 03:03:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2018 03:03:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 33874E0137; Wed, 5 Dec 2018 03:03:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@ratis.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-ratis git commit: RATIS-378. Consolidate common logic across logservice servers Date: Wed, 5 Dec 2018 03:03:10 +0000 (UTC) Repository: incubator-ratis Updated Branches: refs/heads/master d72d9c6eb -> 046782430 RATIS-378. Consolidate common logic across logservice servers Signed-off-by: Rajeshbabu Chintaguntla Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/04678243 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/04678243 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/04678243 Branch: refs/heads/master Commit: 046782430a9aee3094869ca2b63bcf0194c70e93 Parents: d72d9c6 Author: Josh Elser Authored: Wed Nov 28 12:59:14 2018 -0500 Committer: Josh Elser Committed: Tue Dec 4 21:38:23 2018 -0500 ---------------------------------------------------------------------- .../logservice/client/LogServiceClient.java | 2 +- .../ratis/logservice/common/Constants.java | 6 +- .../ratis/logservice/server/BaseServer.java | 94 ++++++++++ .../ratis/logservice/server/LogServer.java | 138 ++++++++++++++ .../ratis/logservice/server/MasterServer.java | 178 ------------------- .../logservice/server/MetaStateMachine.java | 21 ++- .../ratis/logservice/server/MetadataServer.java | 146 +++++++++++++++ .../ratis/logservice/server/ServerOpts.java | 119 +++++++++++++ .../ratis/logservice/util/LogServiceUtils.java | 1 - .../logservice/worker/LogServiceWorker.java | 161 ----------------- .../ratis/logservice/server/TestMetaServer.java | 14 +- .../logservice/util/LogServiceCluster.java | 57 +++--- 12 files changed, 558 insertions(+), 379 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java index 5bdfb93..1e42ab4 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/client/LogServiceClient.java @@ -68,7 +68,7 @@ public class LogServiceClient implements AutoCloseable { public LogServiceClient(String metaQuorum, LogServiceConfiguration config) { Set peers = getPeersFromQuorum(metaQuorum); RaftProperties properties = new RaftProperties(); - RaftGroup meta = RaftGroup.valueOf(Constants.metaGroupID, peers); + RaftGroup meta = RaftGroup.valueOf(Constants.META_GROUP_ID, peers); client = RaftClient.newBuilder() .setRaftGroup(meta) .setClientId(ClientId.randomId()) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java index ba50154..2da892b 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/common/Constants.java @@ -24,8 +24,10 @@ import java.util.UUID; public class Constants { - final public static RaftGroupId metaGroupID = RaftGroupId.valueOf(new UUID(0,1)); + public static final UUID META_GROUP_UUID = new UUID(0,1); + public static final RaftGroupId META_GROUP_ID = RaftGroupId.valueOf(META_GROUP_UUID); - final public static RaftGroupId serversGroupID = RaftGroupId.valueOf(new UUID(0,2)); + public static final UUID SERVERS_GROUP_UUID = new UUID(0,2); + public static final RaftGroupId SERVERS_GROUP_ID = RaftGroupId.valueOf(SERVERS_GROUP_UUID); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java new file mode 100644 index 0000000..717d5b6 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/BaseServer.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.server; + +import java.io.Closeable; +import java.net.InetSocketAddress; +import java.util.Objects; + +import org.apache.ratis.logservice.util.LogServiceUtils; +import org.apache.ratis.util.NetUtils; + +/** + * A base class to encapsulate functionality around a long-lived Java process which runs a state machine. + */ +public abstract class BaseServer implements Closeable { + + private final ServerOpts opts; + + public BaseServer(ServerOpts opts) { + this.opts = Objects.requireNonNull(opts); + } + + public ServerOpts getServerOpts() { + return opts; + } + + static ServerOpts buildOpts(String hostname, String metaQuorum, int port, String workingDir) { + ServerOpts opts = new ServerOpts(); + opts.setHost(hostname); + opts.setMetaQuorum(metaQuorum); + opts.setPort(port); + opts.setWorkingDir(workingDir); + return opts; + } + + public abstract static class Builder { + private ServerOpts opts = new ServerOpts(); + + protected ServerOpts getOpts() { + return opts; + } + + public abstract T build(); + + public Builder validate() { + if (opts.getPort() == -1) { + InetSocketAddress addr = NetUtils.createLocalServerAddress(); + opts.setPort(addr.getPort()); + } + if (opts.getHost() == null) { + opts.setHost(LogServiceUtils.getHostName()); + } + if (opts.getWorkingDir() == null) { + throw new IllegalArgumentException("Working directory was not specified"); + } + return this; + } + + public Builder setMetaQuorum(String meta) { + opts.setMetaQuorum(meta); + return this; + } + + public Builder setPort(int port) { + opts.setPort(port); + return this; + } + + public Builder setWorkingDir(String workingDir) { + opts.setWorkingDir(workingDir); + return this; + } + + public Builder setHostName(String hostName) { + opts.setHost(hostName); + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java new file mode 100644 index 0000000..d68a9f7 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.server; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Set; + +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.logservice.util.LogServiceUtils; +import org.apache.ratis.logservice.util.MetaServiceProtoUtil; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; + +public class LogServer extends BaseServer { + private static final Logger LOG = LoggerFactory.getLogger(LogServer.class); + + private RaftServer raftServer = null; + private RaftClient metaClient = null; + + public LogServer(ServerOpts opts) { + super(opts); + } + + public RaftServer getServer() { + return raftServer; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public void start() throws IOException { + final ServerOpts opts = getServerOpts(); + Set peers = LogServiceUtils.getPeersFromQuorum(opts.getMetaQuorum()); + RaftProperties properties = new RaftProperties(); + properties.set("raft.client.rpc.request.timeout", "100000"); + GrpcConfigKeys.Server.setPort(properties, opts.getPort()); + NettyConfigKeys.Server.setPort(properties, opts.getPort()); + InetSocketAddress addr = new InetSocketAddress(opts.getHost(), opts.getPort()); + if(opts.getWorkingDir() != null) { + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(opts.getWorkingDir()))); + } + String id = opts.getHost() +"_" + opts.getPort(); + RaftPeer peer = new RaftPeer(RaftPeerId.valueOf(id), addr); + final RaftGroupId logServerGroupId = RaftGroupId.valueOf(opts.getLogServerGroupId()); + RaftGroup all = RaftGroup.valueOf(logServerGroupId, peer); + RaftGroup meta = RaftGroup.valueOf(RaftGroupId.valueOf(opts.getMetaGroupId()), peers); + raftServer = RaftServer.newBuilder() + .setStateMachineRegistry(new StateMachine.Registry() { + private final StateMachine managementMachine = new ManagementStateMachine(); + private final StateMachine logMachine = new LogStateMachine(); + @Override + public StateMachine apply(RaftGroupId raftGroupId) { + // TODO this looks wrong. Why isn't this metaGroupId? + if(raftGroupId.equals(logServerGroupId)) { + return managementMachine; + } + return logMachine; + } + }) + .setProperties(properties) + .setServerId(RaftPeerId.valueOf(id)) + .setGroup(all) + .build(); + raftServer.start(); + + metaClient = RaftClient.newBuilder() + .setRaftGroup(meta) + .setClientId(ClientId.randomId()) + .setProperties(properties) + .build(); + metaClient.send(() -> MetaServiceProtoUtil.toPingRequestProto(peer).toByteString()); + } + + public static void main(String[] args) throws IOException { + ServerOpts opts = new ServerOpts(); + JCommander.newBuilder() + .addObject(opts) + .build() + .parse(args); + + try (LogServer worker = new LogServer(opts)) { + worker.start(); + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + + public void close() throws IOException { + raftServer.close(); + } + + public static class Builder extends BaseServer.Builder { + public LogServer build() { + validate(); + return new LogServer(getOpts()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java deleted file mode 100644 index 8e9f7d3..0000000 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MasterServer.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ratis.logservice.server; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.logservice.common.Constants; -import org.apache.ratis.logservice.util.LogServiceUtils; -import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.protocol.*; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.FileUtils; -import org.apache.ratis.util.LifeCycle; - -import java.io.Closeable; -import java.io.File; -import java.io.IOException; -import java.net.*; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Set; - -import static org.apache.ratis.logservice.common.Constants.metaGroupID; -import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum; - -/** - * Master quorum is responsible for tracking all available quorum members - */ -public class MasterServer implements Closeable { - - - // RaftServer internal server. Has meta raft group and MetaStateMachine - private RaftServer server; - - private String id; - - private String host; - - @Parameter(names = "-port", description = "Port number") - private int port = 9999; - - @Parameter(names = "-dir", description = "Working directory") - private String workingDir = null; - - private StateMachine metaStateMachine; - - private LifeCycle lifeCycle; - - public MasterServer(String hostname, int port, String workingDir) { - this.port = port; - this.host = hostname; - this.workingDir = workingDir; - id = host + "_" + port; - this.lifeCycle = new LifeCycle(this.id); - - } - - public MasterServer() { - - } - - public void start(String metaGroupId) throws IOException { - if (host == null) { - host = LogServiceUtils.getHostName(); - } - this.lifeCycle = new LifeCycle(this.id); - RaftProperties properties = new RaftProperties(); - if(workingDir != null) { - RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(workingDir))); - } - GrpcConfigKeys.Server.setPort(properties, port); - NettyConfigKeys.Server.setPort(properties, port); - Set peers = getPeersFromQuorum(metaGroupId); - RaftGroup metaGroup = RaftGroup.valueOf(Constants.metaGroupID, peers); - metaStateMachine = new MetaStateMachine(); - server = RaftServer.newBuilder() - .setGroup(metaGroup) - .setServerId(RaftPeerId.valueOf(id)) - .setStateMachineRegistry(raftGroupId -> { - if(raftGroupId.equals(metaGroupID)) { - return metaStateMachine; - } - return null; - }) - .setProperties(properties).build(); - lifeCycle.startAndTransition(() -> { - server.start(); - }, IOException.class); - } - - public static void main(String[] args) throws IOException { - MasterServer master = new MasterServer(); - JCommander.newBuilder() - .addObject(master) - .build() - .parse(args); - master.start(null); - - - } - public static MasterServer.Builder newBuilder() { - return new MasterServer.Builder(); - } - - @Override - public void close() throws IOException { - server.close(); - } - - public String getId() { - return id; - } - - public String getAddress() { - return host + ":" + port; - } - - public void cleanUp() throws IOException { - FileUtils.deleteFully(new File(workingDir)); - } - - public static class Builder { - private String host = null; - private int port = 9999; - private String workingDir = null; - - /** - * @return a {@link MasterServer} object. - */ - public MasterServer build() { - if (host == null) { - host = LogServiceUtils.getHostName(); - } - return new MasterServer(host, port, workingDir); - } - - /** - * Set the server hostname. - */ - public Builder setHost(String host) { - this.host = host; - return this; - } - - /** - * Set server port - */ - public Builder setPort(int port) { - this.port = port; - return this; - } - - public Builder setWorkingDir(String workingDir) { - this.workingDir = workingDir; - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java index 6a386b8..24f5263 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java @@ -18,9 +18,6 @@ package org.apache.ratis.logservice.server; -import static org.apache.ratis.logservice.common.Constants.metaGroupID; -import static org.apache.ratis.logservice.common.Constants.serversGroupID; - import java.io.IOException; import java.util.Collection; import java.util.HashSet; @@ -85,7 +82,7 @@ public class MetaStateMachine extends BaseStateMachine { //Persisted map between log and RaftGroup private Map map = new ConcurrentHashMap<>(); // List of the currently known peers. - private final Set peers = new HashSet(); + private final Set peers = new HashSet<>(); // keep a copy of raftServer to get group information. private RaftServer raftServer; @@ -101,6 +98,13 @@ public class MetaStateMachine extends BaseStateMachine { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + private RaftGroupId metadataGroupId; + private RaftGroupId logServerGroupId; + + public MetaStateMachine(RaftGroupId metadataGroupId, RaftGroupId logServerGroupId) { + this.metadataGroupId = metadataGroupId; + this.logServerGroupId = logServerGroupId; + } @Override public void initialize(RaftServer server, RaftGroupId groupId, RaftStorage storage) throws IOException { @@ -170,7 +174,9 @@ public class MetaStateMachine extends BaseStateMachine { public CompletableFuture query(Message request) { if (currentGroup == null) { try { - List x = StreamSupport.stream(raftServer.getGroups().spliterator(), false).filter(group -> group.getGroupId().equals(metaGroupID)).collect(Collectors.toList()); + List x = StreamSupport.stream(raftServer.getGroups().spliterator(), false) + .filter(group -> group.getGroupId().equals(metadataGroupId)) + .collect(Collectors.toList()); if (x.size() == 1) { currentGroup = x.get(0); } @@ -221,7 +227,7 @@ public class MetaStateMachine extends BaseStateMachine { RaftClient client = RaftClient.newBuilder() .setProperties(properties) .setClientId(ClientId.randomId()) - .setRaftGroup(RaftGroup.valueOf(serversGroupID, peer)) + .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)) .build(); try { client.groupRemove(raftGroup.getGroupId(), true, peer.getId()); @@ -301,7 +307,8 @@ public class MetaStateMachine extends BaseStateMachine { avail.add(pg); }); peers.forEach(i -> { - RaftClient client = RaftClient.newBuilder().setProperties(properties).setRaftGroup(RaftGroup.valueOf(serversGroupID, i)).build(); + RaftClient client = RaftClient.newBuilder().setProperties(properties) + .setRaftGroup(RaftGroup.valueOf(logServerGroupId, i)).build(); try { client.groupAdd(raftGroup, i.getId()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java new file mode 100644 index 0000000..1d658fe --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetadataServer.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.logservice.server; + +import com.beust.jcommander.JCommander; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.logservice.util.LogServiceUtils; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.LifeCycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +import static org.apache.ratis.logservice.common.Constants.META_GROUP_ID; +import static org.apache.ratis.logservice.util.LogServiceUtils.getPeersFromQuorum; + +/** + * Master quorum is responsible for tracking all available quorum members + */ +public class MetadataServer extends BaseServer { + private static final Logger LOG = LoggerFactory.getLogger(MetadataServer.class); + + // RaftServer internal server. Has meta raft group and MetaStateMachine + private RaftServer server; + + private String id; + + private StateMachine metaStateMachine; + + private LifeCycle lifeCycle; + + public MetadataServer(ServerOpts opts) { + super(opts); + LOG.debug("Metadata Server options: {}", opts); + this.id = opts.getHost() + "_" + opts.getPort(); + this.lifeCycle = new LifeCycle(this.id); + } + + public void start() throws IOException { + final ServerOpts opts = getServerOpts(); + if (opts.getHost() == null) { + opts.setHost(LogServiceUtils.getHostName()); + } + this.lifeCycle = new LifeCycle(this.id); + RaftProperties properties = new RaftProperties(); + if(opts.getWorkingDir() != null) { + RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(opts.getWorkingDir()))); + } + GrpcConfigKeys.Server.setPort(properties, opts.getPort()); + NettyConfigKeys.Server.setPort(properties, opts.getPort()); + Set peers = getPeersFromQuorum(opts.getMetaQuorum()); + RaftGroupId raftMetaGroupId = RaftGroupId.valueOf(opts.getMetaGroupId()); + RaftGroup metaGroup = RaftGroup.valueOf(raftMetaGroupId, peers); + metaStateMachine = new MetaStateMachine(raftMetaGroupId, RaftGroupId.valueOf(opts.getLogServerGroupId())); + server = RaftServer.newBuilder() + .setGroup(metaGroup) + .setServerId(RaftPeerId.valueOf(id)) + .setStateMachineRegistry(raftGroupId -> { + if(raftGroupId.equals(META_GROUP_ID)) { + return metaStateMachine; + } + return null; + }) + .setProperties(properties).build(); + lifeCycle.startAndTransition(() -> { + server.start(); + }, IOException.class); + } + + public static void main(String[] args) throws IOException { + ServerOpts opts = new ServerOpts(); + JCommander.newBuilder() + .addObject(opts) + .build() + .parse(args); + + try (MetadataServer master = new MetadataServer(opts)) { + master.start(); + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + public static MetadataServer.Builder newBuilder() { + return new MetadataServer.Builder(); + } + + @Override + public void close() throws IOException { + server.close(); + } + + public String getId() { + return id; + } + + public String getAddress() { + return getServerOpts().getHost() + ":" + getServerOpts().getPort(); + } + + public void cleanUp() throws IOException { + FileUtils.deleteFully(new File(getServerOpts().getWorkingDir())); + } + + public static class Builder extends BaseServer.Builder { + /** + * @return a {@link MetadataServer} object. + */ + public MetadataServer build() { + validate(); + return new MetadataServer(getOpts()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java new file mode 100644 index 0000000..18e4ba6 --- /dev/null +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/ServerOpts.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.logservice.server; + +import java.util.UUID; + +import org.apache.ratis.logservice.common.Constants; + +import com.beust.jcommander.IStringConverter; +import com.beust.jcommander.Parameter; + +/** + * Options that are common across metadata and log server classes. + */ +public class ServerOpts { + /** + * Converts a string into a UUID + */ + static class UUIDConverter implements IStringConverter { + @Override public UUID convert(String value) { + return UUID.fromString(value); + } + } + + @Parameter(names = {"-h", "--hostname"}, description = "Hostname") + private String host; + + @Parameter(names = {"-p", "--port"}, description = "Port number") + private int port = -1; + + @Parameter(names = {"-d", "--dir"}, description = "Working directory") + private String workingDir = null; + + @Parameter(names = {"-q", "--metaQuorum"}, description = "Metadata Service Quorum") + private String metaQuorum; + + @Parameter(names = {"--metadataServerGroupId"}, description = "UUID corresponding to the RAFT metadata servers group", + converter = UUIDConverter.class) + private UUID metaGroupId = Constants.META_GROUP_UUID; + + @Parameter(names = {"--logServerGroupId"}, description = "UUID corresponding to the RAFT log servers group", + converter = UUIDConverter.class) + private UUID logServerGroupId = Constants.SERVERS_GROUP_UUID; + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getWorkingDir() { + return workingDir; + } + + public void setWorkingDir(String workingDir) { + this.workingDir = workingDir; + } + + public String getMetaQuorum() { + return metaQuorum; + } + + public void setMetaQuorum(String metaQuorum) { + this.metaQuorum = metaQuorum; + } + + public UUID getMetaGroupId() { + return metaGroupId; + } + + public void setMetaGroupId(UUID metaGroupId) { + this.metaGroupId = metaGroupId; + } + + public UUID getLogServerGroupId() { + return logServerGroupId; + } + + public void setLogServerGroupId(UUID logServerGroupId) { + this.logServerGroupId = logServerGroupId; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Hostname=").append(host); + sb.append(", port=").append(port); + sb.append(", dir=").append(workingDir); + sb.append(", metaQuorum=").append(metaQuorum); + sb.append(", metaGroupId=").append(metaGroupId); + sb.append(", logServerGroupId=").append(logServerGroupId); + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java index a25a7df..c44853f 100644 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java +++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceUtils.java @@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftPeerId; import java.net.DatagramSocket; import java.net.InetAddress; -import java.net.SocketException; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java deleted file mode 100644 index e8fd895..0000000 --- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/worker/LogServiceWorker.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ratis.logservice.worker; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import org.apache.ratis.client.RaftClient; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.logservice.server.LogStateMachine; -import org.apache.ratis.logservice.server.ManagementStateMachine; -import org.apache.ratis.logservice.util.MetaServiceProtoUtil; -import org.apache.ratis.logservice.util.LogServiceUtils; -import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.protocol.*; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.statemachine.StateMachine; -import org.apache.ratis.util.NetUtils; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Collections; -import java.util.Set; - -import static org.apache.ratis.logservice.common.Constants.metaGroupID; -import static org.apache.ratis.logservice.common.Constants.serversGroupID; - -public class LogServiceWorker implements Cloneable{ - - @Parameter(names = "-port", description = "Port number") - private int port; - - @Parameter(names = "-dir", description = "Working directory") - private String workingDir; - - @Parameter(names = "-meta", description = "Meta Quorum ID") - private String metaIdentity; - RaftServer raftServer = null; - RaftClient metaClient = null; - - public LogServiceWorker() { - - } - public LogServiceWorker(String meta, int port, String workingDir) { - this.metaIdentity = meta; - this.port = port; - this.workingDir = workingDir; - } - - public RaftServer getServer() { - return raftServer; - } - - public static Builder newBuilder() { - return new Builder(); - } - - public void start() throws IOException { - Set peers = LogServiceUtils.getPeersFromQuorum(metaIdentity); - String host = LogServiceUtils.getHostName(); - RaftProperties properties = new RaftProperties(); - properties.set("raft.client.rpc.request.timeout", "100000"); - GrpcConfigKeys.Server.setPort(properties, port); - NettyConfigKeys.Server.setPort(properties, port); - InetSocketAddress addr = new InetSocketAddress(host,port); - if(workingDir != null) { - RaftServerConfigKeys.setStorageDirs(properties, Collections.singletonList(new File(workingDir))); - } - String id = host +"_" + port; - RaftPeer peer = new RaftPeer(RaftPeerId.valueOf(id), addr); - RaftGroup all = RaftGroup.valueOf(serversGroupID, peer); - RaftGroup meta = RaftGroup.valueOf(metaGroupID, peers); - raftServer = RaftServer.newBuilder() - .setStateMachineRegistry(new StateMachine.Registry() { - final StateMachine managementMachine = new ManagementStateMachine(); - final StateMachine logMachine = new LogStateMachine(); - @Override - public StateMachine apply(RaftGroupId raftGroupId) { - if(raftGroupId.equals(serversGroupID)) { - return managementMachine; - } - return logMachine; - } - }) - .setProperties(properties) - .setServerId(RaftPeerId.valueOf(id)) - .setGroup(all) - .build(); - raftServer.start(); - - metaClient = RaftClient.newBuilder() - .setRaftGroup(meta) - .setClientId(ClientId.randomId()) - .setProperties(properties) - .build(); - metaClient.send(() -> MetaServiceProtoUtil.toPingRequestProto(peer).toByteString()); - } - - public static void main(String[] args) throws IOException { - LogServiceWorker worker = new LogServiceWorker(); - JCommander.newBuilder() - .addObject(worker) - .build() - .parse(args); - worker.start(); - - - } - - - public void close() throws IOException { - raftServer.close(); - } - - public static class Builder { - String meta; - int port = -1; - private String workingDir; - - public LogServiceWorker build() { - if(port == -1) { - InetSocketAddress addr = NetUtils.createLocalServerAddress(); - port = addr.getPort(); - } - return new LogServiceWorker(meta, port, workingDir); - } - public Builder setMetaIdentity(String meta) { - this.meta = meta; - return this; - } - public Builder setPort(int port) { - this.port = port; - return this; - } - - public Builder setWorkingDir(String workingDir) { - this.workingDir = workingDir; - return this; - } - - - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java index 2f5c3d7..cbdd2b5 100644 --- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java @@ -22,8 +22,8 @@ import org.apache.ratis.logservice.api.*; import org.apache.ratis.logservice.client.LogServiceClient; import org.apache.ratis.logservice.common.LogAlreadyExistException; import org.apache.ratis.logservice.common.LogNotFoundException; +import org.apache.ratis.logservice.server.LogServer; import org.apache.ratis.logservice.util.LogServiceCluster; -import org.apache.ratis.logservice.worker.LogServiceWorker; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.junit.AfterClass; @@ -48,13 +48,15 @@ public class TestMetaServer { public static void beforeClass() { cluster = new LogServiceCluster(3); cluster.createWorkers(3); - List workers = cluster.getWorkers(); + List workers = cluster.getWorkers(); assert(workers.size() == 3); } @AfterClass public static void afterClass() { - cluster.close(); + if (cluster != null) { + cluster.close(); + } } /** @@ -80,14 +82,14 @@ public class TestMetaServer { ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes()); List listLogs = client.listLogs(); assert(listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1); - List workers = cluster.getWorkers(); - for(LogServiceWorker worker : workers) { + List workers = cluster.getWorkers(); + for(LogServer worker : workers) { RaftServerImpl server = ((RaftServerProxy)worker.getServer()) .getImpl(listLogs.get(0).getRaftGroup().getGroupId()); // TODO: perform all additional checks on state machine level } writer.write(testMessage); - for(LogServiceWorker worker : workers) { + for(LogServer worker : workers) { RaftServerImpl server = ((RaftServerProxy)worker.getServer()) .getImpl(listLogs.get(0).getRaftGroup().getGroupId()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/04678243/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java ---------------------------------------------------------------------- diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java index d1c688b..29999dd 100644 --- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java +++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/util/LogServiceCluster.java @@ -19,16 +19,13 @@ package org.apache.ratis.logservice.util; -import org.apache.commons.io.FileUtils; import org.apache.ratis.BaseTest; import org.apache.ratis.logservice.api.LogName; import org.apache.ratis.logservice.api.LogStream; -import org.apache.ratis.logservice.api.LogInfo; import org.apache.ratis.logservice.client.LogServiceClient; -import org.apache.ratis.logservice.worker.LogServiceWorker; -import org.apache.ratis.logservice.server.MasterServer; +import org.apache.ratis.logservice.server.LogServer; +import org.apache.ratis.logservice.server.MetadataServer; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -40,8 +37,8 @@ import java.util.stream.IntStream; */ public class LogServiceCluster implements AutoCloseable { - private List masters; - private List workers = new ArrayList<>(); + private List masters; + private List workers = new ArrayList<>(); private String baseTestDir = BaseTest.getRootTestDir().getAbsolutePath(); /** @@ -50,9 +47,11 @@ public class LogServiceCluster implements AutoCloseable { */ public void createWorkers(int numWorkers) { String meta = getMetaIdentity(); - List newWorkers = IntStream.range(0, numWorkers).parallel().mapToObj(i -> - LogServiceWorker.newBuilder() - .setMetaIdentity(meta) + List newWorkers = IntStream.range(0, numWorkers).parallel().mapToObj(i -> + LogServer.newBuilder() + .setHostName("localhost") + .setPort(10000 + i) + .setMetaQuorum(meta) .setWorkingDir(baseTestDir + "/workers/" + i) .build()).collect(Collectors.toList()); newWorkers.parallelStream().forEach( worker -> { @@ -70,6 +69,7 @@ public class LogServiceCluster implements AutoCloseable { * @return the string that represent the meta quorum ID that can can be used to manually create a worker nodes */ public String getMetaIdentity() { + // Nb. Can only be called after the masters have been instantiated. return masters.stream().map(object -> object.getAddress()).collect(Collectors.joining(",")); } @@ -80,17 +80,28 @@ public class LogServiceCluster implements AutoCloseable { */ public LogServiceCluster(int numServers) { + // Have to construct the meta quorum by hand -- `getMetaIdentity()` requires + // uses the masters to build the quorum (chicken and egg problem). + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < numServers; i++) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append("localhost:").append(9000 + i); + } + String metaQuorum = sb.toString(); this.masters = IntStream.range(0, numServers).parallel().mapToObj(i -> - MasterServer.newBuilder() - .setHost(LogServiceUtils.getHostName()) + MetadataServer.newBuilder() + .setHostName("localhost") .setPort(9000 + i) .setWorkingDir(baseTestDir + "/masters/" + i) + .setMetaQuorum(metaQuorum) .build()) .collect(Collectors.toList()); masters.parallelStream().forEach(master -> { try { master.cleanUp(); - master.start(getMetaIdentity()); + master.start(); } catch (IOException e) { throw new RuntimeException(e); } @@ -103,15 +114,16 @@ public class LogServiceCluster implements AutoCloseable { * @param logName * @throws IOException */ - public LogStream createLog(LogName logName) throws IOException { - LogServiceClient client = new LogServiceClient(getMetaIdentity()); - return client.createLog(logName); + public LogStream createLog(LogName logName) throws Exception { + try (LogServiceClient client = new LogServiceClient(getMetaIdentity())) { + return client.createLog(logName); + } } /** * @return the current set of the workers */ - public List getWorkers() { + public List getWorkers() { return workers; } @@ -119,7 +131,7 @@ public class LogServiceCluster implements AutoCloseable { * * @return the current set of the masters */ - public List getMasters() { + public List getMasters() { return masters; } @@ -146,10 +158,10 @@ public class LogServiceCluster implements AutoCloseable { }); } - public LogStream getLog(LogName logName) throws IOException { - LogServiceClient client = new LogServiceClient(getMetaIdentity()); - return client.getLog(logName); - + public LogStream getLog(LogName logName) throws Exception { + try (LogServiceClient client = new LogServiceClient(getMetaIdentity())) { + return client.getLog(logName); + } } /** @@ -157,6 +169,5 @@ public class LogServiceCluster implements AutoCloseable { */ public void cleanUp() { // FileUtils.deleteDirectory(); - } }