uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dietmar Gräbner <d.graeb...@gmail.com>
Subject Re: UimaAS blocks when accessing a queue with multiple clients concurrently
Date Thu, 09 Dec 2010 18:15:40 GMT
thx for the example - I'll test it tomorrow.

Best regards

On Thu, Dec 9, 2010 at 5:21 PM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
> For some reason attachment dont seem to work. Here is my code:
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *   http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing,
>  * software distributed under the License is distributed on an
>  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>  * KIND, either express or implied.  See the License for the
>  * specific language governing permissions and limitations
>  * under the License.
>  */
>
>
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
> import org.apache.uima.aae.client.UimaAsynchronousEngine;
> import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
> import org.apache.uima.cas.CAS;
>
> /**
>  * Example client application that can instantiate multiple UIMA AS clients
> each running in
>  * a separate thread.
>  * <p>
>  * Arguments: brokerUrl endpoint <howManyCASesToSend> <scaleup>
>  * <p>
>  * The application creates as many UIMA AS clients and threads as specified
> in the "scaleup"
>  * argument. Each instance runs in its own thread and has is its own temp
> reply queue. It
>  * uses a synchronous <i>sendAndReceive()</i> to send CASes to a remote
> service. For this
>  * a CAS Pool containing a single CAS is sufficient.
>  * <p>
>  * Each client sends as many CASes to a remote service as specified in the
> "howManyCASesToSend"
>  * argument.
>  * <p>
>  * The application initializes a CountDownLatch to the number of
> clients/threads which is than
>  * used to await completion. When a worker thread completes its run, it
> sends a CPC and counts down the
>  * latch reducing the number of busy threads. When all threads finish, the
> application is notified
>  * and can proceed to cleanup and shutdown.
>  *
>  */
> public class MultithreadedClientApp {
> public CountDownLatch latch = null;
> public void initializeAndRun(String[] args) throws Exception {
> try {
> int howManyWorkers = Integer.parseInt(args[3]);  // how many threads
> latch = new CountDownLatch(howManyWorkers);  // each worker counts down when
> done
> // Create Worker threads
> ClientWorker[] workers = new ClientWorker[howManyWorkers];
> final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
> howManyWorkers);
> // Thread Pool Executor to manages threads
> ThreadPoolExecutor threadPool = new ThreadPoolExecutor(howManyWorkers,
> howManyWorkers,
>                Integer.MAX_VALUE, TimeUnit.SECONDS, queue);
> // Start all threads
> threadPool.prestartAllCoreThreads();
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i] = new ClientWorker();
> // 0 - brokerURL, 1 - queue name, 2 - how many CASes to send
> workers[i].initialize(args[0],args[1],Integer.parseInt(args[2]));
> }
>
> for( int i=0; i < howManyWorkers; i++ ) {
> threadPool.submit(workers[i]);  // start the workers
> }
> // Each worker counts down the latch after it is done sending CASes
> latch.await();
>  // All worker threads completed, now stop the clients
> for( int i=0; i < howManyWorkers; i++ ) {
> workers[i].stop();  // stop UIMA AS clients
> }
>
> threadPool.shutdown();  // cleanup thread pool
>  System.out.println("All UIMA AS Clients Finished Processing");
> } catch( Exception e ) {
> e.printStackTrace();
> }
>  }
> public static void main(String[] args) {
> MultithreadedClientApp client = new MultithreadedClientApp();
> try {
> if ( args.length != 4 ) {
> System.out.println("Usage: ");
> }
> client.initializeAndRun(args);
> } catch( Exception e ) {
> e.printStackTrace();
> }
> }
> public class ClientWorker implements Runnable {
> private BaseUIMAAsynchronousEngine_impl uimaASClient = null;
> private int howManyCASes = 0;
> public void initialize(String brokerUrl, String endpoint, int howManyCASes )
> throws Exception {
>  uimaASClient = new BaseUIMAAsynchronousEngine_impl();
> Map<String, Object> appCtx = new HashMap<String, Object>();
>    // set server URI and Endpoint
>    // Add Broker URI
>    appCtx.put(UimaAsynchronousEngine.ServerUri, brokerUrl);
>    // Add Queue Name
>    appCtx.put(UimaAsynchronousEngine.Endpoint, endpoint);
>    // Add the Cas Pool Size and initial FS heap size
>    appCtx.put(UimaAsynchronousEngine.CasPoolSize, 1);
>
>    // initialize
>    uimaASClient.initialize(appCtx);
>    this.howManyCASes = howManyCASes;
> }
> public void stop() {
> uimaASClient.stop();
> }
> public void run() {
> try {
>    int sentSoFar = 0;
>             CAS cas = uimaASClient.getCAS();
>             int count=1;
>     while( sentSoFar < howManyCASes ) {
>
>          cas.setDocumentText("Some Text");
>
>          uimaASClient.sendAndReceiveCAS(cas);
>          System.out.println("Thread:"+Thread.currentThread().getId()+":::
> Success Calling sendAndReceiveCAS(). Sent "+count++ + " CASes so far");
>          cas.reset();
>          sentSoFar++;
>     }
>     uimaASClient.collectionProcessingComplete();
>     System.out.println("Thread::"+Thread.currentThread().getId()+" Sent
> CPC. Thread Done");
>     latch.countDown();
> } catch( Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
>
>
>
>
> On Thu, Dec 9, 2010 at 11:04 AM, Jaroslaw Cwiklik <uimaee@gmail.com> wrote:
>
>> Dietmar, I tried my example application with an Aggregate Service and see
>> no problem. Your previous email had no source attached.
>> Attached please find an example application code I use in my testing. To
>> run it"
>>
>> java -cp <classpath> MultithreadedClientApp
>> <brokerURL><serviceQueueName><howManyCASesEachThreadShouldSend><howManyThreads>
>>
>> The code adds a short text to each CAS before each call to
>> sendAndReceive(). There are no app listeners attached to UIMA AS client.
>>
>> Jerry
>>
>> 2010/12/9 Dietmar Gräbner <d.graebner@gmail.com>
>>
>> Hi Eddie,
>>>
>>> wouldn't the client requests be serialized in the szenario you propose?
>>>
>>> Dietmar
>>>
>>>
>>>
>>> On Tue, Dec 7, 2010 at 10:54 PM, Eddie Epstein <eaepstein@gmail.com>
>>> wrote:
>>> > 2010/12/7 Dietmar Gräbner <d.graebner@gmail.com>:
>>> >> I wrote a test client creating multiple threads. Each thread
>>> >> instantiates a BaseUIMAAsynchronousEngine_impl and invokes a uima
>>> >> aggregate with the sendAndReceiveCAS() call. When running the program
>>> >> with e.g. 100 Threads the client gets stuck after processing X calls.
>>> >
>>> > FWIW, a similar multithreaded client scenario that has been used with
>>> > no problems is to instantiate a single BaseUIMAAsynchronousEngine_impl
>>> > with big enough casPool and have each thread call sendAndReceiveCAS()
>>> > using the common API object.
>>> >
>>> > Eddie
>>> >
>>>
>>
>>
>

Mime
View raw message