myriad-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jpgilabe...@apache.org
Subject [06/26] incubator-myriad git commit: Upgrade mesos driver to Mesos 1.5 with protobuf 2.5
Date Wed, 12 Sep 2018 15:52:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java b/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java
new file mode 100644
index 0000000..8ac1498
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/AbstractState.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.Iterator;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mesos.MesosNativeLibrary;
+
+/**
+ * Abstract implementation of State that provides operations on
+ * futures to make concrete classes easier to create.
+ */
+public abstract class AbstractState implements State {
+    static {
+        MesosNativeLibrary.load();
+    }
+
+    @Override
+    public Future<Variable> fetch(final String name) {
+        if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) {
+            return new FetchFuture(name);
+        }
+
+        // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161).
+        final long future = __fetch(name); // Asynchronously start the operation.
+        return new Future<Variable>() {
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                if (mayInterruptIfRunning) {
+                    return __fetch_cancel(future);
+                }
+                return false; // Should not interrupt and already running (or finished).
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return __fetch_is_cancelled(future);
+            }
+
+            @Override
+            public boolean isDone() {
+                return __fetch_is_done(future);
+            }
+
+            @Override
+            public Variable get() throws InterruptedException, ExecutionException {
+                return __fetch_get(future);
+            }
+
+            @Override
+            public Variable get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+                return __fetch_get_timeout(future, timeout, unit);
+            }
+
+            @Override
+            protected void finalize() {
+                __fetch_finalize(future);
+            }
+        };
+    }
+
+    @Override
+    public Future<Variable> store(Variable variable) {
+        if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) {
+            return new StoreFuture(variable);
+        }
+
+        // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161).
+        final long future = __store(variable); // Asynchronously start the operation.
+        return new Future<Variable>() {
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                if (mayInterruptIfRunning) {
+                    return __store_cancel(future);
+                }
+                return false; // Should not interrupt and already running (or finished).
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return __store_is_cancelled(future);
+            }
+
+            @Override
+            public boolean isDone() {
+                return __store_is_done(future);
+            }
+
+            @Override
+            public Variable get() throws InterruptedException, ExecutionException {
+                return __store_get(future);
+            }
+
+            @Override
+            public Variable get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+                return __store_get_timeout(future, timeout, unit);
+            }
+
+            @Override
+            protected void finalize() {
+                __store_finalize(future);
+            }
+        };
+    }
+
+    @Override
+    public Future<Boolean> expunge(Variable variable) {
+        if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) {
+            return new ExpungeFuture(variable);
+        }
+
+        // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161).
+        final long future = __expunge(variable); // Asynchronously start the operation.
+        return new Future<Boolean>() {
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                if (mayInterruptIfRunning) {
+                    return __expunge_cancel(future);
+                }
+                return false; // Should not interrupt and already running (or finished).
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return __expunge_is_cancelled(future);
+            }
+
+            @Override
+            public boolean isDone() {
+                return __expunge_is_done(future);
+            }
+
+            @Override
+            public Boolean get() throws InterruptedException, ExecutionException {
+                return __expunge_get(future);
+            }
+
+            @Override
+            public Boolean get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+                return __expunge_get_timeout(future, timeout, unit);
+            }
+
+            @Override
+            protected void finalize() {
+                __expunge_finalize(future);
+            }
+        };
+    }
+
+    public Future<Iterator<String>> names() {
+        if (!MesosNativeLibrary.version().before(MESOS_2161_JIRA_FIX_VERSION)) {
+            return new NamesFuture();
+        }
+
+        // TODO(jmlvanre): Deprecate anonymous future in 0.24 (MESOS-2161).
+        final long future = __names(); // Asynchronously start the operation.
+        return new Future<Iterator<String>>() {
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                if (mayInterruptIfRunning) {
+                    return __names_cancel(future);
+                }
+                return false; // Should not interrupt and already running (or finished).
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return __names_is_cancelled(future);
+            }
+
+            @Override
+            public boolean isDone() {
+                return __names_is_done(future);
+            }
+
+            @Override
+            public Iterator<String> get() throws  InterruptedException,
+                    ExecutionException {
+                return __names_get(future);
+            }
+
+            @Override
+            public Iterator<String> get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, TimeoutException {
+                return __names_get_timeout(future, timeout, unit);
+            }
+
+            @Override
+            protected void finalize() {
+                __names_finalize(future);
+            }
+        };
+    }
+
+    protected native void finalize();
+
+    // Native implementations of 'fetch', 'store', 'expunge', and 'names'. We wrap
+    // them in classes to carry the java references correctly through the JNI
+    // bindings (MESOS-2161). The native functions in AbstractState will be
+    // deprecated in 0.24.
+
+    private class FetchFuture implements Future<Variable> {
+
+        public FetchFuture(String name) {
+            future = __fetch(name);
+        }
+
+        @Override
+        public native boolean cancel(boolean mayInterruptIfRunning);
+
+        @Override
+        public native boolean isCancelled();
+
+        @Override
+        public native boolean isDone();
+
+        @Override
+        public native Variable get()
+                throws InterruptedException, ExecutionException;
+
+        @Override
+        public native Variable get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException;
+
+        @Override
+        protected native void finalize();
+
+        private long future;
+    }
+
+    private native long __fetch(String name);
+
+    // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track
+    // the java object references correctly. (MESOS-2161). The above 'FetchFuture'
+    // class fixes this bug. We leave the below functions for backwards
+    // compatibility.
+    private native boolean __fetch_cancel(long future);
+    private native boolean __fetch_is_cancelled(long future);
+    private native boolean __fetch_is_done(long future);
+    private native Variable __fetch_get(long future);
+    private native Variable __fetch_get_timeout(
+            long future, long timeout, TimeUnit unit);
+    private native void __fetch_finalize(long future);
+
+    private class StoreFuture implements Future<Variable> {
+
+        public StoreFuture(Variable variable) {
+            future = __store(variable);
+        }
+
+        @Override
+        public native boolean cancel(boolean mayInterruptIfRunning);
+
+        @Override
+        public native boolean isCancelled();
+
+        @Override
+        public native boolean isDone();
+
+        @Override
+        public native Variable get()
+                throws InterruptedException, ExecutionException;
+
+        @Override
+        public native Variable get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException;
+
+        @Override
+        protected native void finalize();
+
+        private long future;
+    }
+
+    private native long __store(Variable variable);
+
+    // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track
+    // the java object references correctly. (MESOS-2161). The above 'StoreFuture'
+    // class fixes this bug. We leave the below functions for backwards
+    // compatibility.
+    private native boolean __store_cancel(long future);
+    private native boolean __store_is_cancelled(long future);
+    private native boolean __store_is_done(long future);
+    private native Variable __store_get(long future);
+    private native Variable __store_get_timeout(
+            long future, long timeout, TimeUnit unit);
+    private native void __store_finalize(long future);
+
+    private class ExpungeFuture implements Future<Boolean> {
+
+        public ExpungeFuture(Variable variable) {
+            future = __expunge(variable);
+        }
+
+        @Override
+        public native boolean cancel(boolean mayInterruptIfRunning);
+
+        @Override
+        public native boolean isCancelled();
+
+        @Override
+        public native boolean isDone();
+
+        @Override
+        public native Boolean get()
+                throws InterruptedException, ExecutionException;
+
+        @Override
+        public native Boolean get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException;
+
+        @Override
+        protected native void finalize();
+
+        private long future;
+    }
+
+    private native long __expunge(Variable variable);
+
+    // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track
+    // the java object references correctly. (MESOS-2161). The above
+    // 'ExpungeFuture' class fixes this bug. We leave the below functions for
+    // backwards compatibility.
+    private native boolean __expunge_cancel(long future);
+    private native boolean __expunge_is_cancelled(long future);
+    private native boolean __expunge_is_done(long future);
+    private native Boolean __expunge_get(long future);
+    private native Boolean __expunge_get_timeout(
+            long future, long timeout, TimeUnit unit);
+    private native void __expunge_finalize(long future);
+
+    private class NamesFuture implements Future<Iterator<String>> {
+
+        public NamesFuture() {
+            future = __names();
+        }
+
+        @Override
+        public native boolean cancel(boolean mayInterruptIfRunning);
+
+        @Override
+        public native boolean isCancelled();
+
+        @Override
+        public native boolean isDone();
+
+        @Override
+        public native Iterator<String> get()
+                throws InterruptedException, ExecutionException;
+
+        @Override
+        public native Iterator<String> get(long timeout, TimeUnit unit)
+                throws InterruptedException, ExecutionException, TimeoutException;
+
+        @Override
+        protected native void finalize();
+
+        private long future;
+    }
+
+    private native long __names();
+
+    // TODO(jmlvanre): Deprecate below functions in 0.24 because we can't track
+    // the java object references correctly. (MESOS-2161). The above 'NamesFuture'
+    // class fixes this bug. We leave the below functions for backwards
+    // compatibility.
+    private native boolean __names_cancel(long future);
+    private native boolean __names_is_cancelled(long future);
+    private native boolean __names_is_done(long future);
+    private native Iterator<String> __names_get(long future);
+    private native Iterator<String> __names_get_timeout(
+            long future, long timeout, TimeUnit unit);
+    private native void __names_finalize(long future);
+
+    private long __storage;
+    private long __state;
+
+    private final static MesosNativeLibrary.Version MESOS_2161_JIRA_FIX_VERSION =
+            new MesosNativeLibrary.Version(0, 22, 1);
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java b/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java
new file mode 100644
index 0000000..ebc82cc
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/InMemoryState.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+/**
+ * An in-memory implementation of state.
+ */
+public class InMemoryState implements State {
+    @Override
+    public Future<Variable> fetch(String name) {
+        Entry entry = entries.get(name); // Is null if doesn't exist.
+
+        if (entry == null) {
+            entry = new Entry();
+            entry.name = name;
+            entry.uuid = UUID.randomUUID();
+            entry.value = new byte[0];
+
+            // We use 'putIfAbsent' because multiple threads might be
+            // attempting to fetch a "new" variable at the same time.
+            if (entries.putIfAbsent(name, entry) != null) {
+                return fetch(name);
+            }
+        }
+
+        assert entry != null;
+
+        return futureFrom((Variable) new InMemoryVariable(entry));
+    }
+
+    @Override
+    public Future<Variable> store(Variable v) {
+        InMemoryVariable variable = (InMemoryVariable) v;
+
+        Entry entry = new Entry();
+        entry.name = variable.entry.name;
+        entry.uuid = UUID.randomUUID();
+        entry.value = variable.value;
+
+        if (entries.replace(entry.name, variable.entry, entry)) {
+            return futureFrom((Variable) new InMemoryVariable(entry));
+        }
+
+        return futureFrom((Variable) null);
+    }
+
+    @Override
+    public Future<Boolean> expunge(Variable v) {
+        InMemoryVariable variable = (InMemoryVariable) v;
+
+        return futureFrom(entries.remove(variable.entry.name, variable.entry));
+    }
+
+    @Override
+    public Future<Iterator<String>> names() {
+        return futureFrom(entries.keySet().iterator());
+    }
+
+    private static class InMemoryVariable extends Variable {
+        private InMemoryVariable(Entry entry) {
+            this(entry, null);
+        }
+
+        private InMemoryVariable(Entry entry, byte[] value) {
+            this.entry = entry;
+            this.value = value;
+        }
+
+        @Override
+        public byte[] value() {
+            if (this.value != null) {
+                return this.value;
+            } else {
+                return this.entry.value;
+            }
+        }
+
+        @Override
+        public Variable mutate(byte[] value) {
+            return new InMemoryVariable(entry, value);
+        }
+
+        final Entry entry;
+        final byte[] value;
+    }
+
+    private static class Entry {
+        @Override
+        public boolean equals(Object that) {
+            if (that instanceof Entry) {
+                return uuid.equals(((Entry) that).uuid);
+            }
+
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return uuid.hashCode();
+        }
+
+        String name;
+        UUID uuid;
+        byte[] value;
+    }
+
+    private static <T> Future<T> futureFrom(final T t) {
+        FutureTask<T> future = new FutureTask<T>(new Callable<T>() {
+            public T call() {
+                return t;
+            }});
+        future.run();
+        return future;
+    }
+
+    private final ConcurrentMap<String, Entry> entries =
+            new ConcurrentHashMap<String, Entry>();
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java b/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java
new file mode 100644
index 0000000..d462dd7
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/LevelDBState.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+/**
+ * Implementation of State that uses LevelDB to store
+ * variables/values.
+ */
+public class LevelDBState extends AbstractState {
+    /**
+     * Constructs a new instance of LevelDBState.
+     *
+     * @param path  Absolute path to database.
+     */
+    public LevelDBState(String path) {
+        initialize(path);
+    }
+
+    protected native void initialize(String path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java b/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java
new file mode 100644
index 0000000..7df56b5
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/LogState.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of State that uses a replicated log to store
+ * variables/values.
+ */
+public class LogState extends AbstractState {
+    /**
+     * Constructs a new instance of LogState.
+     *
+     * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'.
+     * @param timeout ZooKeeper session timeout.
+     * @param unit    Unit for session timeout.
+     * @param znode   Path to znode where log replicas should be found.
+     * @param quorum  Number of replicas necessary to persist a write.
+     * @param path    Path the local replica uses to read/write data.
+     */
+    public LogState(String servers,
+                    long timeout,
+                    TimeUnit unit,
+                    String znode,
+                    long quorum,
+                    String path) {
+        initialize(servers, timeout, unit, znode, quorum, path, 0);
+    }
+
+    /**
+     * Constructs a new instance of LogState.
+     *
+     * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'.
+     * @param timeout ZooKeeper session timeout.
+     * @param unit    Unit for session timeout.
+     * @param znode   Path to znode where log replicas should be found.
+     * @param quorum  Number of replicas necessary to persist a write.
+     * @param path    Path the local replica uses to read/write data.
+     * @param diffsBetweenSnapshots Number of diffs to write between snapshots.
+     */
+    public LogState(String servers,
+                    long timeout,
+                    TimeUnit unit,
+                    String znode,
+                    long quorum,
+                    String path,
+                    int diffsBetweenSnapshots) {
+        initialize(servers, timeout, unit, znode, quorum, path, diffsBetweenSnapshots);
+    }
+
+    protected native void initialize(String servers,
+                                     long timeout,
+                                     TimeUnit unit,
+                                     String znode,
+                                     long quorum,
+                                     String path,
+                                     int diffsBetweenSnapshots);
+
+    protected native void finalize();
+
+    private long __log;
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/State.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/State.java b/myriad-commons/src/main/java/org/apache/mesos/state/State.java
new file mode 100644
index 0000000..3050401
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/State.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.Iterator;
+
+import java.util.concurrent.Future;
+
+/**
+ * An abstraction of "state" (possibly between multiple distributed
+ * components) represented by "variables" (effectively key/value
+ * pairs). Variables are versioned such that setting a variable in the
+ * state will only succeed if the variable has not changed since last
+ * fetched. Varying implementations of state provide varying
+ * replicated guarantees.
+ * <p>
+ * Note that the semantics of 'fetch' and 'store' provide
+ * atomicity. That is, you cannot store a variable that has changed
+ * since you did the last fetch. That is, if a store succeeds then no
+ * other writes have been performed on the variable since your fetch.
+ *
+ * Example:
+ * <pre>
+ * {@code
+ *   State state = new ZooKeeperState();
+ *   Future<Variable> variable = state.fetch("machines");
+ *   Variable machines = variable.get();
+ *   machines = machines.mutate(...);
+ *   variable = state.store(machines);
+ *   machines = variable.get();
+ * }
+ * </pre>
+ */
+public interface State {
+    /**
+     * Returns an immutable "variable" representing the current value
+     * from the state associated with the specified name.
+     *
+     * @param name  The name of the variable.
+     *
+     * @return      A future of the variable.
+     *
+     * @see Variable
+     */
+    Future<Variable> fetch(String name);
+
+    /**
+     * Returns an immutable "variable" representing the current value in
+     * the state if updating the specified variable in the state was
+     * successful, otherwise returns null.
+     *
+     * @param variable  The variable to be stored.
+     *
+     * @return          A future of a variable with the new value on success,
+     *                  or null on failure.
+     *
+     * @see Variable
+     */
+    Future<Variable> store(Variable variable);
+
+    /**
+     * Returns true if successfully expunged the variable from the state
+     * or false if the variable did not exist or was no longer valid.
+     *
+     * @param variable  The variable to be expunged.
+     *
+     * @return          A future of true on success, false on failure.
+     *
+     * @see Variable
+     */
+    Future<Boolean> expunge(Variable variable);
+
+    /**
+     * Returns an iterator of variable names in the state.
+     *
+     * @return A future of an iterator over all variable names in the state.
+     */
+    Future<Iterator<String>> names();
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java b/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java
new file mode 100644
index 0000000..0be3e3d
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/Variable.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+/**
+ * Represents a key/value pair stored in the {@link State}. Variables
+ * are versioned such that setting a variable in the state will only
+ * succeed if the variable has not changed since last fetched.
+ */
+public class Variable {
+    protected Variable() {}
+
+    /**
+     * Returns the current value of this variable.
+     *
+     * @return The current value.
+     */
+    public native byte[] value();
+
+    /**
+     * Updates the current value of this variable.
+     *
+     * @param value The new value.
+     *
+     * @return      A variable representing the new value.
+     */
+    public native Variable mutate(byte[] value);
+
+    protected native void finalize();
+
+    private long __variable;
+}

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java
----------------------------------------------------------------------
diff --git a/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java b/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java
new file mode 100644
index 0000000..4855236
--- /dev/null
+++ b/myriad-commons/src/main/java/org/apache/mesos/state/ZooKeeperState.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of State that uses ZooKeeper to store
+ * variables/values. Note that this means the values associated with
+ * variables cannot be more than 1 MB (actually slightly less since
+ * we store some bookkeeping information).
+ */
+public class ZooKeeperState extends AbstractState {
+    /**
+     * Constructs a new instance of ZooKeeperState.
+     *
+     * @param servers List of ZooKeeper servers, e.g., 'ip1:port1,ip2:port2'.
+     * @param timeout ZooKeeper session timeout.
+     * @param unit    Unit for session timeout.
+     * @param znode   Path to znode where "state" should be rooted.
+     */
+    public ZooKeeperState(String servers,
+                          long timeout,
+                          TimeUnit unit,
+                          String znode) {
+        initialize(servers, timeout, unit, znode);
+    }
+
+    /**
+     * Constructs a new instance of ZooKeeperState.
+     *
+     * @param servers     List of ZooKeeper servers (e.g., 'ip1:port1,ip2:port2').
+     * @param timeout     ZooKeeper session timeout.
+     * @param unit        Unit for session timeout.
+     * @param znode       Path to znode where "state" should be rooted.
+     * @param scheme      Authentication scheme (e.g., "digest").
+     * @param credentials Authentication credentials (e.g., "user:pass").
+     */
+    public ZooKeeperState(String servers,
+                          long timeout,
+                          TimeUnit unit,
+                          String znode,
+                          String scheme,
+                          byte[] credentials) {
+        initialize(servers, timeout, unit, znode, scheme, credentials);
+    }
+
+    protected native void initialize(String servers,
+                                     long timeout,
+                                     TimeUnit unit,
+                                     String znode);
+
+    protected native void initialize(String servers,
+                                     long timeout,
+                                     TimeUnit unit,
+                                     String znode,
+                                     String scheme,
+                                     byte[] credentials);
+}


Mime
View raw message