river-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Peter Firmstone <j...@zeus.net.au>
Subject Re: TaskManager progress
Date Sun, 25 Jul 2010 08:32:15 GMT
Something I need to consider is that I intend to support Java CDC 1.11 
Personal Basis Profile, this will be a subset of the Jini API using 
JERI, in place of java RMI, only to allow simple service participation, 
there's no intent to have all the existing Service implementations run 
on it, just the basics.

CDC enables supporting, blue ray players, Amazon Kindle and TV set top 
boxes, no phones unfortunately, that seems to have died.  The existing 
ServiceUI enable's support of various User interfaces for different 
platforms.

For that reason, we should probably prefer com.sun.jini.thread.Executor 
over java.util.concurrent.Executor in any API.

One implementation can map to the other.

Cheers,

Peter.

Peter Firmstone wrote:
> Hi Patricia,
>
> Don't even know if the following compiles, the intent was to make it 
> possible for the a RunnableDep to be depended on Remotely, allowing 
> all sorts of Dependant Runnables to sort out their differences.  I 
> also envisioned a singleton RunnableManager for a JVM instance to 
> enable tuning the number of threads.
>
> In this case RunnableDep doesn't extend or implement Runnable.  The 
> class RunnableDepHelper is designed to be encapsulated by a 
> RunnableDep implementation.  The RunnableDepHelper executes the 
> RunnableDep via execute() and captures any exceptions, delivering them 
> to other dependant RunnableDep's.  They can then decide if the 
> Exception will prevent them from running and retry if possible.
>
> No RunnableDep is ever submitted to the RunnableManager until it is 
> ready to execute, it is also compatible with Remote implementations, 
> although non-Remote and Remote instances can't be mixed, at least with 
> the current design.  By allowing Remote RunnableDep's, the potential 
> is there to allow Proxy's to execute code locally or Remotely.
> See below, all very rough and unfinished, watch out for 
> synchronization lock gotcha's, this hasn't had any testing or proper 
> analysis.
>
> I think for the current implementations where Tasks had no 
> dependencies, we should simply implement Runnable, not RunnableDep.
>
> You might want to rename these classes perhaps to SequencedExecutable, 
> I think Sequencer explains it better.
>
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> *      http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> package org.apache.river.imp.util;
>
> /**
> *
> * @author Peter Firmstone
> */
> public interface RunnableDep {
>    void addPreceeding(RunnableDep happensBefore) throws Exception;
>    void addDependant(RunnableDep happensAfter) throws Exception;
>    public void notify(RunnableDep isComplete, Exception ex,int status) 
> throws Exception;
>    void prime() throws Exception;
>    void execute() throws Exception;
>    void abort() throws Exception;
>    void retry() throws Exception;
>    int status() throws Exception;
> }
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> *      http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> package org.apache.river.imp.util;
>
> import java.util.concurrent.Executor;
>
> /**
> * Singleton Executor to control number of threads used.
> * @author Peter Firmstone.
> */
> public class RunnableManager implements Executor{
>    private static Executor systemExecutor = new RunnableManager();
>    static Executor getExecutor(){
>        return systemExecutor;
>    }
>  
>      private RunnableManager(){}
>      public void execute(Runnable command) {
>        throw new UnsupportedOperationException("Not supported yet.");
>    }
> }
>
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> *      http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> package org.apache.river.imp.util;
>
> import java.util.HashSet;
> import java.util.Iterator;
> import java.util.Set;
>
> /**
> * RunnableDepHelper is a runtime object that implements most of the
> * functionality of RunnableDep, it is intended to be encapsulated by
> * the implementer of RunnableDep.  The encapsulting object, called the 
> master
> * must be passed into the constructor.  The master is passed to preceding
> * and dependant tasks.
> *
> * The only method the master need's to implement directly is 
> execute(), which
> * is equivalent to run(), but may throw an exception, the exception 
> thrown
> * will be stored by the RunnableDepHelper.
> *
> * The master may intercept calls to alter behaviour, if for instance 
> the order
> * of tasks is important, but for instance a precedent that throw's an 
> exception
> * needn't prevent the master from running, the master may set the 
> exception to
> * null before handing the notify call to RunnableDepHelper, which will
> * remove the preceeding task from it's internal list.  If notify is
> * intercepted, the exception thrown by prime, may be ignored, if the 
> status
> * is 0, prime may be called again until no exception is thrown.
> *
> *
> * The master only need implement RunnableDep, Runnable is implemented by
> * the RunnableDepHelper on behalf of the master.
> *
> * The master must implement abort if it can abort and then call abort on
> * RunnableDepHelper.  If abort is not supported it should throw an 
> Exception.
> *
> * @author Peter Firmstone.
> */
> public class RunnableDepHelper implements RunnableDep, Runnable {
>    private final RunnableDep master;
>    private final Set<RunnableDep> dependants;
>    private final Set<RunnableDep> precedents;
>    private volatile Exception thrown;
>    private volatile int status;
>      public RunnableDepHelper(RunnableDep master){
>        this.master = master;
>        dependants = new HashSet<RunnableDep>(3);
>        precedents = new HashSet<RunnableDep>(3);
>        thrown = null;
>        status = 0;
>    }
>
>    public void addPreceeding(RunnableDep before) throws Exception {
>        boolean added = false;
>        synchronized (precedents){
>            added = precedents.add(before);
>        }
>        if (added){ before.addDependant(master);}
>    }
>
>    public void addDependant(RunnableDep after) throws Exception {
>        boolean added = false;
>        if (thrown != null) throw thrown;
>        if (status == -1) throw new Exception("RunnableDep has failed 
> state");
>        synchronized (dependants){
>            if ( status == 0 ){
>                added = dependants.add(after);
>            }
>        }
>        if (added){
>            after.addPreceeding(master);
>        }
>        if (status == 1){
>            try {
>                after.notify(master, null, status);
>                // once notified of successful completion dependants 
> are notified
>                // and removed.
>                synchronized (dependants){
>                    dependants.remove(after);
>                }
>            } catch (Exception ex) {
>                // exception swallowed, the dependant can check itself.
>            }
>        }
>    }
>
>    public void notify(RunnableDep complete, Exception ex, int status) 
> throws Exception {
>        boolean empty = false;
>        synchronized (precedents){
>            if ( ex == null ) {
>                if ( status == 1 ) {
>                    precedents.remove(complete);
>                }
>                empty = precedents.isEmpty();
>            } else if ( thrown == null && precedents.contains(complete) 
> ) {
>                thrown = new Exception("Preeceding RunnableDep failed 
> with exception" +
>                        "run aborted", ex);
>                status = -1;
>            }
>        }
>        if ( empty && status == 0){
>            submit();
>        }
>    }
>
>    public void prime() throws Exception {
>        if (thrown != null){
>            throw thrown;
>        }
>        boolean empty;
>        synchronized (precedents) {
>            empty = precedents.isEmpty();
>        }
>        if (empty){
>            if (status == 0) submit();
>        } else {
>            synchronized (precedents) {
>                Iterator<RunnableDep> pre = precedents.iterator();
>                while (pre.hasNext()){
>                    pre.next().prime();
>                }
>            }
>        }
>          }
>
>    public void abort() throws Exception {
>        status = -1;
>        Exception ex = new Exception("RunnableDep aborted");
>        Set<RunnableDep> deps = null;
>        synchronized (dependants) {
>            deps = new HashSet<RunnableDep>(dependants.size());
>            deps.addAll(dependants);
>        }
>        Iterator<RunnableDep> depCom = deps.iterator();
>        while (depCom.hasNext()) {
>            RunnableDep dependant = depCom.next();
>            try {
>                dependant.notify(master, ex, status);
>                // Don't remove dependant in case of retry.
>            } catch (Exception e){
>                // ignore, dependant can find out later.
>            }
>        }
>    }
>
>    public int status() throws Exception {
>        return status;
>    }
>
>    public void execute() throws Exception {
>        // Do nothing master implements.
>    }
>
>    public void run() {
>        boolean empty;
>        synchronized (precedents) {
>            empty = precedents.isEmpty();
>        }
>        try {
>            if (!empty) {
>                throw new Exception("Precedents are incomplete");
>            }
>            master.execute();
>            status = 1;
>        } catch (Exception e) {
>            thrown = e;
>            status = -1;
>        }
>    }
>      private void submit(){
>        RunnableManager.getExecutor().execute(this);
>    }
>
>    public void retry() throws Exception {
>        if ( status == 1 ) return;
>        status = 0;
>        thrown = null;
>        Set<RunnableDep> pre;
>        synchronized (precedents){
>            pre = new HashSet<RunnableDep>(precedents.size());
>            pre.addAll(precedents);
>        }
>        Iterator<RunnableDep> prePrime = pre.iterator();
>        while (prePrime.hasNext()){
>            RunnableDep precedent = prePrime.next();
>            if ( precedent.status() == -1){
>                precedent.retry();
>            }
>        }
>        prime();
>    }
>
> }
>
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> *      http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> package org.apache.river.imp.util;
>
> import java.util.HashMap;
>
> /**
> *
> * @author peter
> */
> public interface Transaction {
>
>    boolean begin();
>
>    boolean commit();
>
>    HashMap<RunnableDep,Exception> getErrors();
>
>    boolean rollback();
>
> }
>
> /*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> *      http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
> implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> package org.apache.river.imp.util;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.logging.Level;
> import java.util.logging.Logger;
>
> /**
> *
> * @author Peter Firmstone
> */
> public class TransactionUnit implements Transaction, RunnableDep {
>    private HashMap<RunnableDep,Integer> groupList;
>    private boolean locked;
>    private HashMap<RunnableDep,Exception> exceptions;
>    public TransactionUnit (Iterator<RunnableDep> tasks) throws Exception{
>        locked = false;
>        groupList = new HashMap<RunnableDep, Integer>();
>        exceptions = new HashMap<RunnableDep, Exception>();
>        while (tasks.hasNext()){
>            RunnableDep task = tasks.next();
>            task.addDependant(this);
>        }
>        locked = true;
>    }
>
>    public boolean begin(){
>        Iterator<RunnableDep> runnables = groupList.keySet().iterator();
>        while (runnables.hasNext()){
>            try {
>                RunnableDep runnable = runnables.next();
>                runnable.prime();
>            } catch (Exception ex) {
>                
> Logger.getLogger(TransactionUnit.class.getName()).log(Level.SEVERE, 
> null, ex);
>                return false;
>            }
>        }
>        return true;
>    }
>      public boolean rollback(){
>        Iterator<RunnableDep> runnables = groupList.keySet().iterator();
>        while (runnables.hasNext()){
>            try {
>                RunnableDep runnable = runnables.next();
>                runnable.abort();
>            } catch (Exception ex) {
>                
> Logger.getLogger(TransactionUnit.class.getName()).log(Level.SEVERE, 
> null, ex);
>                return false;
>            }
>        }
>        return true;     }
>      public boolean commit(){
>        int count = 0;
>        Iterator<Integer> results = groupList.values().iterator();
>        while (results.hasNext()){
>            Integer runnable = results.next();
>            count = count + runnable;
>        }
>        if (count == groupList.size()) return true;
>        return false;
>    }
>      public HashMap<RunnableDep, Exception> getErrors(){
>        HashMap<RunnableDep, Exception> errors;
>        synchronized (exceptions){
>            errors = new HashMap<RunnableDep, 
> Exception>(exceptions.size());
>            errors.putAll(exceptions);
>        }
>        return errors;
>    }
>
>    public void addPreceeding(RunnableDep happensBefore) throws 
> Exception {
>        if (locked) throw new Exception("Attempt to modify transaction " +
>                "after construction");
>        groupList.put(happensBefore,0);
>    }
>
>    public void addDependant(RunnableDep happensAfter) throws Exception {
>        // No dependant's supported at this stage, should they be?
>        if (locked) throw new Exception("Attempt to modify transaction" +
>                " after construction");
>    }
>
>    public void notify(RunnableDep isComplete, Exception ex, int 
> status) throws Exception {
>        groupList.put(isComplete, status);
>        exceptions.put(isComplete, ex);
>    }
>
>    public void prime() throws Exception {
>        begin();
>    }
>
>    public void execute() throws Exception {
>        begin();
>    }
>
>    public void abort() throws Exception {
>        rollback();
>    }
>
>    public void retry() throws Exception {
>        exceptions.clear(); // this is done first to capture any retry 
> exceptions during rollback.
>        rollback();
>        Iterator<RunnableDep> runnables = groupList.keySet().iterator();
>        while (runnables.hasNext()){
>            try {
>                RunnableDep runnable = runnables.next();
>                runnable.retry();
>            } catch (Exception ex) {
>                
> Logger.getLogger(TransactionUnit.class.getName()).log(Level.SEVERE, 
> null, ex);
>            }
>        }
>    }
>
>    public int status() throws Exception {
>        if (commit()) return 1;
>        return 0;
>    }
> }
>
>
>
>


Mime
View raw message