pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error
Date Sun, 16 Feb 2020 19:22:56 GMT
nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586734562
 
 
   @tuteng yes I am running a pulsar standalone version 2.5.0 from binaries (extracting the
binaries from the binary release [here](https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.5.0/apache-pulsar-2.5.0-bin.tar.gz).

   I am running it in MacOS 10.15.3 with python 3.7.3 and pulsar-client 2.5.0 installed with
pip globally.
   
   The pulsar function is copying the value from one topic with an Avro schema directly to
another topic, something like this (I tried to simplify my pulsar function removing implementations
that are not needed in the example):
   
   ```python
   from pulsar import Function, SerDe
   from pulsar.schema import *
   import fastavro
   import io
   
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
   
   class AvroSerDe(SerDe):
       def __init__(self):
           pass
   
       def serialize(self, object):
           return object
   
   class BaseFunction(Function):
       def __init__(self):
           self.parsed_schema = SensorSchema.schema()  # fastavro.parse_schema(schema)
   
       def process(self, input, context):
           buffer = io.BytesIO(input)
           d = fastavro.schemaless_reader(buffer, self.parsed_schema)
           newValue = self.customProcess(d, context)
           outbuffer = io.BytesIO()
           fastavro.schemaless_writer(outbuffer, self.parsed_schema, newValue)
           return outbuffer.getvalue()
   
       def customProcess(self, input, context):
           return {}
   
   
   class CopyFunction(BaseFunction):
       def customProcess(self, input, context):
   
           output = {'value': input['value'], 'timestamp': input['timestamp']}
           return output
   
   ```
   I am creating the pulsar function using pulsar-admin in this way
   
   ```
   ./pulsar-admin functions create \
     --py copyFunction.py \
     --classname copyFunction.copyFunction --schema-type SensorSchema --output-serde-classname
copyFunction.AvroSerDe \
     --tenant public \
     --namespace default \
     --name copyFunction1 \
     --inputs persistent://public/default/topic1 \
     --output persistent://public/default/topic2
   ```
   
   The problem is the following:
   
   1. the input topic (topic1) is created from a Java producer. At the beginning my pulsar
function is not started so my sink is copying to a DB only topic1 and all is working well
   2. I start the pulsar function, it is creating topic2 correctly, also if I am starting
a consumer on topic2 I see correctly the values generated by the pulsar function
   3. The sink now is not working showing this :
   
   ```
   20:19:51.224 [public/default/influxdb-test-sink-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable
- [public/default/influxdb-test-sink:0] Uncaught exception in Java Instance
   java.lang.NullPointerException: null
   	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[java-instance.jar:?]
   	at com.google.common.cache.LocalCache.get(LocalCache.java:3950) ~[java-instance.jar:?]
   	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3973) ~[java-instance.jar:?]
   	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4957) ~[java-instance.jar:?]
   	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:98) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:96)
~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.client.impl.schema.AutoConsumeSchema.decode(AutoConsumeSchema.java:39)
~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.client.api.Schema.decode(Schema.java:95) ~[java-instance.jar:?]
   	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:273) ~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.client.impl.TopicMessageImpl.getValue(TopicMessageImpl.java:143)
~[org.apache.pulsar-pulsar-client-original-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.functions.source.PulsarRecord.getValue(PulsarRecord.java:74) ~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:2.5.0]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:472)
~[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
   	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:246)
[org.apache.pulsar-pulsar-functions-instance-2.5.0.jar:?]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]
   20:19:51.230 [public/default/influxdb-test-sink-0] INFO  org.apache.pulsar.functions.instance.JavaInstanceRunnable
- Closing instance
   20:19:51.233 [public/default/influxdb-test-sink-0] INFO  org.apache.pulsar.client.impl.ConsumerImpl
- [persistent://public/default/topic2] [public/default/influxdb-test-sink] Closed consumer
   20:19:51.235 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://public/default/channel1_output4] [public/default/influxdb-test-sink] Closed
consumer
   20:19:51.235 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://public/default/channel2] [public/default/influxdb-test-sink] Closed consumer
   20:19:51.236 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl -
[persistent://public/default/channel1] [public/default/influxdb-test-sink] Closed consumer
   20:19:51.236 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl
- [MultiTopicsConsumer-a98fd] [public/default/influxdb-test-sink] Closed Topics Consumer
   ```
   
   I understood that this should be fixed in 2.5.0 by in my case the fix seems to not work.

   I can give to you any other information in case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message