From commits-return-350-archive-asf-public=cust-asf.ponee.io@myriad.incubator.apache.org Wed Sep 12 17:52:06 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9A4D8180630 for ; Wed, 12 Sep 2018 17:52:04 +0200 (CEST) Received: (qmail 21836 invoked by uid 500); 12 Sep 2018 15:52:03 -0000 Mailing-List: contact commits-help@myriad.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@myriad.incubator.apache.org Delivered-To: mailing list commits@myriad.incubator.apache.org Received: (qmail 21827 invoked by uid 99); 12 Sep 2018 15:52:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2018 15:52:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2DF06C86DA for ; Wed, 12 Sep 2018 15:52:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.201 X-Spam-Level: X-Spam-Status: No, score=-11.201 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_NUMSUBJECT=0.5, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 7SzpuJduydm3 for ; Wed, 12 Sep 2018 15:52:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E78915F500 for ; Wed, 12 Sep 2018 15:51:58 +0000 (UTC) Received: (qmail 21514 invoked by uid 99); 12 Sep 2018 15:51:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2018 15:51:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C8143E0A43; Wed, 12 Sep 2018 15:51:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jpgilaberte@apache.org To: commits@myriad.incubator.apache.org Date: Wed, 12 Sep 2018 15:52:02 -0000 Message-Id: <353ce900303240e29ed080b80dc298e7@git.apache.org> In-Reply-To: <1a6e109a160144e6832ee9a0c8ce40e2@git.apache.org> References: <1a6e109a160144e6832ee9a0c8ce40e2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/26] incubator-myriad git commit: Upgrade mesos driver to Mesos 1.5 with protobuf 2.5 http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java b/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java new file mode 100644 index 0000000..8ac1498 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +import java.util.Iterator; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.mesos.MesosNativeLibrary; + +/** + * Abstract implementation of State that provides operations on + * futures to make concrete classes easier to create. + */ +public abstract class AbstractState implements State { + static { + MesosNativeLibrary.load(); + } + + @Override + public Future fetch(final String name) { + if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) { + return new FetchFuture(name); + } + + // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161). + final long future = __fetch(name); // Asynchronously start the operation. + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + return __fetch_cancel(future); + } + return false; // Should not interrupt and already running (or finished). + } + + @Override + public boolean isCancelled() { + return __fetch_is_cancelled(future); + } + + @Override + public boolean isDone() { + return __fetch_is_done(future); + } + + @Override + public Variable get() throws InterruptedException, ExecutionException { + return __fetch_get(future); + } + + @Override + public Variable get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return __fetch_get_timeout(future, timeout, unit); + } + + @Override + protected void finalize() { + __fetch_finalize(future); + } + }; + } + + @Override + public Future store(Variable variable) { + if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) { + return new StoreFuture(variable); + } + + // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161). + final long future = __store(variable); // Asynchronously start the operation. + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + return __store_cancel(future); + } + return false; // Should not interrupt and already running (or finished). + } + + @Override + public boolean isCancelled() { + return __store_is_cancelled(future); + } + + @Override + public boolean isDone() { + return __store_is_done(future); + } + + @Override + public Variable get() throws InterruptedException, ExecutionException { + return __store_get(future); + } + + @Override + public Variable get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return __store_get_timeout(future, timeout, unit); + } + + @Override + protected void finalize() { + __store_finalize(future); + } + }; + } + + @Override + public Future expunge(Variable variable) { + if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) { + return new ExpungeFuture(variable); + } + + // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161). + final long future = __expunge(variable); // Asynchronously start the operation. + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + return __expunge_cancel(future); + } + return false; // Should not interrupt and already running (or finished). + } + + @Override + public boolean isCancelled() { + return __expunge_is_cancelled(future); + } + + @Override + public boolean isDone() { + return __expunge_is_done(future); + } + + @Override + public Boolean get() throws InterruptedException, ExecutionException { + return __expunge_get(future); + } + + @Override + public Boolean get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return __expunge_get_timeout(future, timeout, unit); + } + + @Override + protected void finalize() { + __expunge_finalize(future); + } + }; + } + + public Future> names() { + if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) { + return new NamesFuture(); + } + + // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161). + final long future = __names(); // Asynchronously start the operation. + return new Future>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + return __names_cancel(future); + } + return false; // Should not interrupt and already running (or finished). + } + + @Override + public boolean isCancelled() { + return __names_is_cancelled(future); + } + + @Override + public boolean isDone() { + return __names_is_done(future); + } + + @Override + public Iterator get() throws InterruptedException, + ExecutionException { + return __names_get(future); + } + + @Override + public Iterator get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return __names_get_timeout(future, timeout, unit); + } + + @Override + protected void finalize() { + __names_finalize(future); + } + }; + } + + protected native void finalize(); + + // Native implementations of 'fetch', 'store', 'expunge', and 'names'. We wrap + // them in classes to carry the java references correctly through the JNI + // bindings (MESOS-2161). The native functions in AbstractState will be + // deprecated in 0.24. + + private class FetchFuture implements Future { + + public FetchFuture(String name) { + future = __fetch(name); + } + + @Override + public native boolean cancel(boolean mayInterruptIfRunning); + + @Override + public native boolean isCancelled(); + + @Override + public native boolean isDone(); + + @Override + public native Variable get() + throws InterruptedException, ExecutionException; + + @Override + public native Variable get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + + @Override + protected native void finalize(); + + private long future; + } + + private native long __fetch(String name); + + // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track + // the java object references correctly. (MESOS-2161). The above 'FetchFuture' + // class fixes this bug. We leave the below functions for backwards + // compatibility. + private native boolean __fetch_cancel(long future); + private native boolean __fetch_is_cancelled(long future); + private native boolean __fetch_is_done(long future); + private native Variable __fetch_get(long future); + private native Variable __fetch_get_timeout( + long future, long timeout, TimeUnit unit); + private native void __fetch_finalize(long future); + + private class StoreFuture implements Future { + + public StoreFuture(Variable variable) { + future = __store(variable); + } + + @Override + public native boolean cancel(boolean mayInterruptIfRunning); + + @Override + public native boolean isCancelled(); + + @Override + public native boolean isDone(); + + @Override + public native Variable get() + throws InterruptedException, ExecutionException; + + @Override + public native Variable get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + + @Override + protected native void finalize(); + + private long future; + } + + private native long __store(Variable variable); + + // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track + // the java object references correctly. (MESOS-2161). The above 'StoreFuture' + // class fixes this bug. We leave the below functions for backwards + // compatibility. + private native boolean __store_cancel(long future); + private native boolean __store_is_cancelled(long future); + private native boolean __store_is_done(long future); + private native Variable __store_get(long future); + private native Variable __store_get_timeout( + long future, long timeout, TimeUnit unit); + private native void __store_finalize(long future); + + private class ExpungeFuture implements Future { + + public ExpungeFuture(Variable variable) { + future = __expunge(variable); + } + + @Override + public native boolean cancel(boolean mayInterruptIfRunning); + + @Override + public native boolean isCancelled(); + + @Override + public native boolean isDone(); + + @Override + public native Boolean get() + throws InterruptedException, ExecutionException; + + @Override + public native Boolean get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + + @Override + protected native void finalize(); + + private long future; + } + + private native long __expunge(Variable variable); + + // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track + // the java object references correctly. (MESOS-2161). The above + // 'ExpungeFuture' class fixes this bug. We leave the below functions for + // backwards compatibility. + private native boolean __expunge_cancel(long future); + private native boolean __expunge_is_cancelled(long future); + private native boolean __expunge_is_done(long future); + private native Boolean __expunge_get(long future); + private native Boolean __expunge_get_timeout( + long future, long timeout, TimeUnit unit); + private native void __expunge_finalize(long future); + + private class NamesFuture implements Future> { + + public NamesFuture() { + future = __names(); + } + + @Override + public native boolean cancel(boolean mayInterruptIfRunning); + + @Override + public native boolean isCancelled(); + + @Override + public native boolean isDone(); + + @Override + public native Iterator get() + throws InterruptedException, ExecutionException; + + @Override + public native Iterator get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; + + @Override + protected native void finalize(); + + private long future; + } + + private native long __names(); + + // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track + // the java object references correctly. (MESOS-2161). The above 'NamesFuture' + // class fixes this bug. We leave the below functions for backwards + // compatibility. + private native boolean __names_cancel(long future); + private native boolean __names_is_cancelled(long future); + private native boolean __names_is_done(long future); + private native Iterator __names_get(long future); + private native Iterator __names_get_timeout( + long future, long timeout, TimeUnit unit); + private native void __names_finalize(long future); + + private long __storage; + private long __state; + + private final static MesosNativeLibrary.Version MESOS_2161_JIRA_FIX_VERSION = + new MesosNativeLibrary.Version(0, 22, 1); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java b/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java new file mode 100644 index 0000000..ebc82cc --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +import java.util.Iterator; +import java.util.UUID; + +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +/** + * An in-memory implementation of state. + */ +public class InMemoryState implements State { + @Override + public Future fetch(String name) { + Entry entry = entries.get(name); // Is null if doesn't exist. + + if (entry == null) { + entry = new Entry(); + entry.name = name; + entry.uuid = UUID.randomUUID(); + entry.value = new byte[0]; + + // We use 'putIfAbsent' because multiple threads might be + // attempting to fetch a "new" variable at the same time. + if (entries.putIfAbsent(name, entry) != null) { + return fetch(name); + } + } + + assert entry != null; + + return futureFrom((Variable) new InMemoryVariable(entry)); + } + + @Override + public Future store(Variable v) { + InMemoryVariable variable = (InMemoryVariable) v; + + Entry entry = new Entry(); + entry.name = variable.entry.name; + entry.uuid = UUID.randomUUID(); + entry.value = variable.value; + + if (entries.replace(entry.name, variable.entry, entry)) { + return futureFrom((Variable) new InMemoryVariable(entry)); + } + + return futureFrom((Variable) null); + } + + @Override + public Future expunge(Variable v) { + InMemoryVariable variable = (InMemoryVariable) v; + + return futureFrom(entries.remove(variable.entry.name, variable.entry)); + } + + @Override + public Future> names() { + return futureFrom(entries.keySet().iterator()); + } + + private static class InMemoryVariable extends Variable { + private InMemoryVariable(Entry entry) { + this(entry, null); + } + + private InMemoryVariable(Entry entry, byte[] value) { + this.entry = entry; + this.value = value; + } + + @Override + public byte[] value() { + if (this.value != null) { + return this.value; + } else { + return this.entry.value; + } + } + + @Override + public Variable mutate(byte[] value) { + return new InMemoryVariable(entry, value); + } + + final Entry entry; + final byte[] value; + } + + private static class Entry { + @Override + public boolean equals(Object that) { + if (that instanceof Entry) { + return uuid.equals(((Entry) that).uuid); + } + + return false; + } + + @Override + public int hashCode() { + return uuid.hashCode(); + } + + String name; + UUID uuid; + byte[] value; + } + + private static Future futureFrom(final T t) { + FutureTask future = new FutureTask(new Callable() { + public T call() { + return t; + }}); + future.run(); + return future; + } + + private final ConcurrentMap entries = + new ConcurrentHashMap(); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java b/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java new file mode 100644 index 0000000..d462dd7 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +/** + * Implementation of State that uses LevelDB to store + * variables/values. + */ +public class LevelDBState extends AbstractState { + /** + * Constructs a new instance of LevelDBState. + * + * @param path Absolute path to database. + */ + public LevelDBState(String path) { + initialize(path); + } + + protected native void initialize(String path); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java b/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java new file mode 100644 index 0000000..7df56b5 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +import java.util.concurrent.TimeUnit; + +/** + * Implementation of State that uses a replicated log to store + * variables/values. + */ +public class LogState extends AbstractState { + /** + * Constructs a new instance of LogState. + * + * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'. + * @param timeout ZooKeeper session timeout. + * @param unit Unit for session timeout. + * @param znode Path to znode where log replicas should be found. + * @param quorum Number of replicas necessary to persist a write. + * @param path Path the local replica uses to read/write data. + */ + public LogState(String servers, + long timeout, + TimeUnit unit, + String znode, + long quorum, + String path) { + initialize(servers, timeout, unit, znode, quorum, path, 0); + } + + /** + * Constructs a new instance of LogState. + * + * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'. + * @param timeout ZooKeeper session timeout. + * @param unit Unit for session timeout. + * @param znode Path to znode where log replicas should be found. + * @param quorum Number of replicas necessary to persist a write. + * @param path Path the local replica uses to read/write data. + * @param diffsBetweenSnapshots Number of diffs to write between snapshots. + */ + public LogState(String servers, + long timeout, + TimeUnit unit, + String znode, + long quorum, + String path, + int diffsBetweenSnapshots) { + initialize(servers, timeout, unit, znode, quorum, path, diffsBetweenSnapshots); + } + + protected native void initialize(String servers, + long timeout, + TimeUnit unit, + String znode, + long quorum, + String path, + int diffsBetweenSnapshots); + + protected native void finalize(); + + private long __log; +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/State.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/State.java b/myriad-commons/src/main/java/org/apache/mesos/state/State.java new file mode 100644 index 0000000..3050401 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/State.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +import java.util.Iterator; + +import java.util.concurrent.Future; + +/** + * An abstraction of "state" (possibly between multiple distributed + * components) represented by "variables" (effectively key/value + * pairs). Variables are versioned such that setting a variable in the + * state will only succeed if the variable has not changed since last + * fetched. Varying implementations of state provide varying + * replicated guarantees. + *

+ * Note that the semantics of 'fetch' and 'store' provide + * atomicity. That is, you cannot store a variable that has changed + * since you did the last fetch. That is, if a store succeeds then no + * other writes have been performed on the variable since your fetch. + * + * Example: + *

+ * {@code
+ *   State state = new ZooKeeperState();
+ *   Future variable = state.fetch("machines");
+ *   Variable machines = variable.get();
+ *   machines = machines.mutate(...);
+ *   variable = state.store(machines);
+ *   machines = variable.get();
+ * }
+ * 
+ */ +public interface State { + /** + * Returns an immutable "variable" representing the current value + * from the state associated with the specified name. + * + * @param name The name of the variable. + * + * @return A future of the variable. + * + * @see Variable + */ + Future fetch(String name); + + /** + * Returns an immutable "variable" representing the current value in + * the state if updating the specified variable in the state was + * successful, otherwise returns null. + * + * @param variable The variable to be stored. + * + * @return A future of a variable with the new value on success, + * or null on failure. + * + * @see Variable + */ + Future store(Variable variable); + + /** + * Returns true if successfully expunged the variable from the state + * or false if the variable did not exist or was no longer valid. + * + * @param variable The variable to be expunged. + * + * @return A future of true on success, false on failure. + * + * @see Variable + */ + Future expunge(Variable variable); + + /** + * Returns an iterator of variable names in the state. + * + * @return A future of an iterator over all variable names in the state. + */ + Future> names(); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java b/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java new file mode 100644 index 0000000..0be3e3d --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +/** + * Represents a key/value pair stored in the {@link State}. Variables + * are versioned such that setting a variable in the state will only + * succeed if the variable has not changed since last fetched. + */ +public class Variable { + protected Variable() {} + + /** + * Returns the current value of this variable. + * + * @return The current value. + */ + public native byte[] value(); + + /** + * Updates the current value of this variable. + * + * @param value The new value. + * + * @return A variable representing the new value. + */ + public native Variable mutate(byte[] value); + + protected native void finalize(); + + private long __variable; +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java b/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java new file mode 100644 index 0000000..4855236 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos.state; + +import java.util.concurrent.TimeUnit; + +/** + * Implementation of State that uses ZooKeeper to store + * variables/values. Note that this means the values associated with + * variables cannot be more than 1 MB (actually slightly less since + * we store some bookkeeping information). + */ +public class ZooKeeperState extends AbstractState { + /** + * Constructs a new instance of ZooKeeperState. + * + * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'. + * @param timeout ZooKeeper session timeout. + * @param unit Unit for session timeout. + * @param znode Path to znode where "state" should be rooted. + */ + public ZooKeeperState(String servers, + long timeout, + TimeUnit unit, + String znode) { + initialize(servers, timeout, unit, znode); + } + + /** + * Constructs a new instance of ZooKeeperState. + * + * @param servers List of ZooKeeper servers (e.g., 'ip1:port1,ip2:port2'). + * @param timeout ZooKeeper session timeout. + * @param unit Unit for session timeout. + * @param znode Path to znode where "state" should be rooted. + * @param scheme Authentication scheme (e.g., "digest"). + * @param credentials Authentication credentials (e.g., "user:pass"). + */ + public ZooKeeperState(String servers, + long timeout, + TimeUnit unit, + String znode, + String scheme, + byte[] credentials) { + initialize(servers, timeout, unit, znode, scheme, credentials); + } + + protected native void initialize(String servers, + long timeout, + TimeUnit unit, + String znode); + + protected native void initialize(String servers, + long timeout, + TimeUnit unit, + String znode, + String scheme, + byte[] credentials); +}