uima-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Kottmann <kottm...@gmail.com>
Subject Re: UIMA-AS and CasManager.defineCasPool() was called twice by the same Analysis Engine
Date Mon, 22 Jun 2009 10:24:20 GMT
Eddie Epstein wrote:
>> I reduced my AAE to three delegate AEs:
>>
>> 1. HBaseCasMultiplier -> fetches the actual text from hbase
>> 2. Tokenizer -> adds tokens to my CAS
>> 3. HBaseWrite -> writes the tokens back into hbase
>>
>> These delegates are not thread safe, to scale these AEs
>> one instance per worker thread must be created.
>> Thats what I want UIMA AS to do for me, so I think thats
>> also the case which is described in the documentation in 1.4.1:
>>
>> "... The classes for annotators and flow controllers do not need to be
>> "thread-safe"
>> with respect to their instance data - meaning, they do not need to be
>> implemented
>> with synchronization locks for access to their instance data, because each
>> instance
>> will only be called using one thread at a time. Scale out for these classes
>> is done using
>> multiple instances of the class. ..."
>>
>>     
>
> That documentation is correct, but apparently not as clear as we'd like.
>
> Note that the following paragraph in the documentation goes on to say
>  "However, if you have class "static" fields shared by all instances,
>   or other kinds of external data shared by all instances (such as a
>   writable file), you must be aware of the possibility of multiple threads
>   accessing these fields or external resources, running on separate
>   instances of the class, and do any required synchronization for these."
>
> So, barring any static fields or resources that would cause problems with
> multiple instantiations, UIMA AS scaleout in the same JVM should work.
>   
Yes, sure I do not have any shared data by the AE instances, actually
I think that paragraph goes without saying, but its still good
to get reminded.

> Hmm, not clear to me that you want async=true. 
I am note sure if I should run async or not. Right now
the analysis is running on one quad core server.
Now I would like to setup UIMA AS in a way that
it uses all the CPU time of all cores for fetching/writing
documents to and from HBase and for analysis.
The interaction with HBase makes the thread idling
for short period of time, thats why I need maybe like
10 threads for fetching and 10 threads for writing
to pump enough documents through the machine
to keep it busy.

Having the AAE async would have the advantage for me
that I only need 10 instances of the fetching CM and 10
instance of the writing delegate AE and not 20 instances
of the whole AAE. The same is true for  analysis there
I can just scale the AEs which are slow.
Though for scaling the CM I have to use the suggested
workaround.

So all in all I think having it async would be an advantage,
but for now it would just be fine to not have it async because
that seems easier.
> Assuming that your
> AE runs correctly as a single threaded aggregate, creating multiple
> instances of this seems fine. The correction to your previous deployment
> descriptor would just be:
>
>           <analysisEngine key="TextAnalysis" async="false">
>               <scaleout numberOfInstances="8" />
>           </analysisEngine>
>
> From UIMA AS point of view, this component is not a CasMultiplier
> because [I assume] it comsumes new CASes internally and does not
> return them.
>
> Let emphasize that before AS scaleout the aggregate should be tested
> as a simple UIMA aggregate with the normal tools like CVD, runAE,
> or a custom driver.
>   
I tested the correction but got the first exception again.
Here is now the full stack trace and not only the cause:

org.apache.uima.analysis_engine.AnalysisEngineProcessException: The 
method CasManager.defineCasPool() was called twice by the same Analysis 
Engine (/HBaseCasMultiplier/).
    at 
org.apache.uima.analysis_engine.asb.impl.ASB_impl$AggregateCasIterator.processUntilNextOutputCas(ASB_impl.java:699)
    at 
org.apache.uima.analysis_engine.asb.impl.ASB_impl$AggregateCasIterator.<init>(ASB_impl.java:407)
    at 
org.apache.uima.analysis_engine.asb.impl.ASB_impl.process(ASB_impl.java:340)
    at 
org.apache.uima.analysis_engine.impl.AggregateAnalysisEngine_impl.processAndOutputNewCASes(AggregateAnalysisEngine_impl.java:267)
    at 
org.apache.uima.aae.controller.PrimitiveAnalysisEngineController_impl.process(PrimitiveAnalysisEngineController_impl.java:376)
    at 
org.apache.uima.aae.handler.HandlerBase.invokeProcess(HandlerBase.java:130)
    at 
org.apache.uima.aae.handler.input.ProcessRequestHandler_impl.handleProcessRequestFromRemoteDelegate(ProcessRequestHandler_impl.java:453)
    at 
org.apache.uima.aae.handler.input.ProcessRequestHandler_impl.handle(ProcessRequestHandler_impl.java:896)
    at 
org.apache.uima.aae.handler.input.MetadataRequestHandler_impl.handle(MetadataRequestHandler_impl.java:84)
    at 
org.apache.uima.adapter.jms.activemq.JmsInputChannel.onMessage(JmsInputChannel.java:665)
    at 
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:485)
    at 
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:442)
    at 
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:414)
    at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:309)
    at 
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:254)
    at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:871)
    at 
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:818)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at 
org.apache.uima.aae.UimaAsThreadFactory$1.run(UimaAsThreadFactory.java:69)
    at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.uima.UIMARuntimeException: The method 
CasManager.defineCasPool() was called twice by the same Analysis Engine 
(/HBaseCasMultiplier/).
    at 
org.apache.uima.resource.impl.CasManager_impl.defineCasPool(CasManager_impl.java:181)
    at 
org.apache.uima.resource.impl.CasManager_impl.defineCasPool(CasManager_impl.java:161)
    at 
org.apache.uima.aae.EECasManager_impl.defineCasPool(EECasManager_impl.java:75)
    at 
org.apache.uima.impl.UimaContext_ImplBase.getEmptyCas(UimaContext_ImplBase.java:565)
    at 
org.apache.uima.analysis_component.CasMultiplier_ImplBase.getEmptyCAS(CasMultiplier_ImplBase.java:109)
    at 
dk.infopaq.nlp.repository.connector.HBaseReadCasMultiplier.hasNext(HBaseReadCasMultiplier.java:107)
    at 
org.apache.uima.analysis_engine.impl.PrimitiveAnalysisEngine_impl$AnalysisComponentCasIterator.hasNext(PrimitiveAnalysisEngine_impl.java:563)
    at 
org.apache.uima.analysis_engine.asb.impl.ASB_impl$AggregateCasIterator.processUntilNextOutputCas(ASB_impl.java:566)
    ... 20 more

>> Ok, I changed it to fit to case described above:
>>           <analysisEngine>
>>               <delegates>
>>                   <analysisEngine key="HBaseCasMultiplier">
>>                       <casMultiplier poolSize="4"/>
>>                       <scaleout numberOfInstances="2" />
>>                   </analysisEngine>
>>                   <analysisEngine key="Tokenizer">
>>                       <scaleout numberOfInstances="4" />
>>                   </analysisEngine>
>>                   <analysisEngine key="HBaseWriter">
>>                       <scaleout numberOfInstances="4" />
>>                   </analysisEngine>
>>               </delegates>
>>           </analysisEngine>
>>
>> I would like to scale the HBaseCasMultiplier to more threads
>> then two, because there is a short delay when reading from hbase.
>> First I am not sure which value I should choose for the
>> Cas Multiplier pool size. If the numberOfInstances get larger
>> then two I get a few exceptions (stack trace below) when UIMA AS
>> starts to process the first documents. So I think I am doing something
>> wrong here. And what is the minimal possible casPoolSize, since
>> I need CAS instances for my 4 Tokenizers, 4 HBaseWriters
>> and 4 (?) for the CAS Multiplier, which would result in a minimum
>> size of 12, right ?
>>
>> The HBaseCasMultiplier gets one CAS which contains the id and
>> then outputs one CAS which contains an actual text.
>>     
>
> Supporting the complexities raised by Cas multipliers has been quite
> challenging. I'm pretty sure that a co-located CM cannot be scaled; we
> need to check this and clarify the situation. (This is different from having
> more than one CM in the same aggregate, which is supported with
> the latest code.)
>
> Here is a possible workaround to run this aggregate asynchronously.
> If I understand your scenario, each input Cas is tiny, the CM creates
> a new Cas with the document to be processed and consumed by
> HBaseWriters, and finally the aggregate returns just the tiny input Cas.
> The workaround is to have the CM create a new Cas, but not fetch
> the document. Add a new delegate immediately following the CM,
> say CorpusReader, which fills the new CASes with documents and can
> be scaled out as desired.
>   
How does CorpusReader get the id which is included in the input Cas ?

Jörn

Mime
View raw message