pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [pulsar] nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error
Date Mon, 17 Feb 2020 10:20:31 GMT
nicolo-paganin edited a comment on issue #5454: mysql JDBC Sink - consumer error
URL: https://github.com/apache/pulsar/issues/5454#issuecomment-586914531
 
 
   @sijie I did the tests, following the results:
   
   **Using python producer**
   I am using this producer
   
   ```python
   import pulsar
   import time
   import datetime
   import calendar
   import time
   from pulsar.schema import *
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
       
   client = pulsar.Client('pulsar://localhost:6650')
   
   producer = client.create_producer('topic_python_producer', schema=AvroSchema(SensorSchema))
   
   i = 0
   while(True):
       i += 1
       message = SensorSchema(value=i, timestamp=calendar.timegm(time.gmtime())*1000)
       print(message)
       producer.send(message)
       time.sleep(1)
   
   client.close()
   ```
   
   The sink is working well, no error and I can see the data in the database
   
   **Using pulsar function**
   I am using this pulsar function, I removed all my customizations so I am using the released
ones and I still have the error. So the problem is related only to pulsar functions
   
   ```python
   from pulsar import Function, SerDe
   from pulsar.schema import *
   import fastavro
   import io
   import json
   import ast
   
   class SensorSchema(Record):
       value = Double(required=True)
       timestamp = Long(required=True)
   
   
   class CopyFunction(Function):
       def __init__(self):
           self.parsed_schema = SensorSchema.schema()
   
       def process(self, input, context):
           buffer = io.BytesIO(input)
           d = fastavro.schemaless_reader(buffer, self.parsed_schema)
           output = {'value': d['value'], 'timestamp': d['timestamp']}
   
           outbuffer = io.BytesIO()
           fastavro.schemaless_writer(outbuffer, self.parsed_schema, output)
           return outbuffer.getvalue()
   ```
   
   I created my function in this way, so I am passing the `--schema-type` param.
   
   ```
   ./pulsar-admin functions create \
     --py copyFunction.py \
     --classname copyFunction.CopyFunction \
     --schema-type SensorSchema  \
     --tenant public \
     --namespace default \
     --name test_function1 \
     --inputs persistent://public/default/topic_java \
     --output persistent://public/default/topic_python
   ```
   
   I don't know how that `--schema-type` param is working: I am passing the schema name of
another topic. In the logs it seems that it is finding it and seems to be associated to the
topic since if I run this command `pulsar-admin schemas get topic_python` this is the output.
Moreover if run any consumer in that topic I can read the values without problems
   
   ```
   {
     "version": 0,
     "schemaInfo": {
       "name": "topic_python",
       "schema": {
         "type": "record",
         "name": "SensorSchema",
         "namespace": "it.oncode.argo.api.actors",
         "fields": [
           {
             "name": "value",
             "type": "double"
           },
           {
             "name": "timestamp",
             "type": "long"
           }
         ]
       },
       "type": "AVRO",
       "properties": {}
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message