apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: KryoException to write Hbase
Date Fri, 21 Oct 2016 05:19:50 GMT
Hi Jaspal,

you can change following line

Gson gson = new Gson();

to

transient Gson gson = new Gson();

This should work.

Operators are serialised using kryo during initial deployment as well
as during checkpoints. kryo is throwing an exception while
trying to serialize gson field in your operator, as Gson does not
seems to be serializable with kryo. Gson instance does not
maintain any useful state, you can safely make it transient and
initialise it in constructor of operator.

- Tushar.



On Fri, Oct 21, 2016 at 9:52 AM, Jaspal Singh
<jaspal.singh1404@gmail.com> wrote:
> Thomas,
>
> Can you please highlight how the Gson has to be a transient member ? Below
> is our operator code.
>
> public class HbaseTableUpdate<T> extends
> AbstractHBasePutOutputOperator<Tenant> implements Serializable {
>  private static final transient Logger logger =
> LoggerFactory.getLogger(HbaseTableUpdate.class);
>  public static final int DEFAULT_BATCH_SIZE = 1000;
>  private int batchSize = DEFAULT_BATCH_SIZE;
>  protected int unCommittedSize = 0;
>  public static final byte[] AUDIT_CF_LOG = "cf".getBytes();
>  public static final byte[] AUDIT_MESSAGE = "msg".getBytes();
>  public static final byte[] AUDIT_STATUS = "sts".getBytes();
>  public static final byte[] AUDIT_RETRY_CNT = "rtc".getBytes();
>  //Configuration conf = HBaseConfiguration.create();
>  //AuditLogDAO auditLogDAO = new AuditLogDAO(conf);
>  Gson gson = new Gson();
>  //ObjectMapper mapper = new ObjectMapper();
>
>  public HbaseTableUpdate() {
>      store = new HBaseStore();
>
> store.setTableName("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/tenant");
>  }
>
>
>
>  @Override
>  public void processTuple(Tenant tenant) {
>      HTable table = store.getTable();
>      Put put = operationPut(tenant);
>      try {
>          table.put(put);
>          if( ++unCommittedSize >= batchSize )
>          {
>              table.flushCommits();
>              unCommittedSize = 0;
>          }
>      } catch (RetriesExhaustedWithDetailsException e) {
>          logger.error("Could not output tuple", e);
>          DTThrowable.rethrow(e);
>      } catch (InterruptedIOException e) {
>          logger.error("Could not output tuple", e);
>          DTThrowable.rethrow(e);
>      }
>      catch (IOException io) {
>
>      }
>
>  }
>
>  @Override
>  public void endWindow()
>  {
>      try
>      {
>          if( unCommittedSize > 0 ) {
>              store.getTable().flushCommits();
>              unCommittedSize = 0;
>          }
>      }
>      catch (RetriesExhaustedWithDetailsException e) {
>          logger.error("Could not output tuple", e);
>          DTThrowable.rethrow(e);
>      } catch (InterruptedIOException e) {
>          logger.error("Could not output tuple", e);
>          DTThrowable.rethrow(e);
>      }
>      catch (IOException io) {
>
>      }
>  }
>
>  @Override
>  public Put operationPut(Tenant tenant) {
>      String rowKey = tenant.getVolumeName() + System.currentTimeMillis();
>      AuditLog log;
>          String msgjson = gson.toJson(tenant);
>
>          if (StringUtils.isNotEmpty(tenant.getGl())) {
>              log = new AuditLog(msgjson,
> Status.VALIDATION_SUCCESS.toString());
>          } else {
>              log = new AuditLog(msgjson,
> Status.VALIDATION_FAILED.toString());
>          }
>
>      Put p = new Put(Bytes.toBytes(rowKey));
>      p.addColumn(AUDIT_CF_LOG, AUDIT_MESSAGE,
> Bytes.toBytes(log.getMessage()));
>      p.addColumn(AUDIT_CF_LOG, AUDIT_STATUS,
> Bytes.toBytes(log.getStatus()));
>      p.addColumn(AUDIT_CF_LOG, AUDIT_RETRY_CNT,
> Bytes.toBytes(log.getUpdateCount()));
>      return p;
>  }
>
>
>
> Thanks!!
>
>
>
> On Thu, Oct 20, 2016 at 4:53 PM, Thomas Weise <thw@apache.org> wrote:
>>
>> Please make sure that your Gson parser is a transient member of the
>> operator.
>>
>> On Thu, Oct 20, 2016 at 2:33 PM, Bandaru, Srinivas
>> <srinivas.bandaru@optum.com> wrote:
>>>
>>> Hi,
>>>
>>> We are building a an apex (Datatorrent) application to write into Hbase.
>>> Getting the below error while launching the application. Is anyone had the
>>> similar issue?
>>>
>>>
>>>
>>> 2016-10-20 16:05:50,675 INFO org.apache.hadoop.service.AbstractService:
>>> Service com.datatorrent.stram.StreamingAppMasterService failed in state
>>> INITED; cause: com.esotericsoftware.kryo.KryoException: Encountered
>>> unregistered class ID: 95
>>>
>>> Serialization trace:
>>>
>>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>>
>>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>>
>>> constructorConstructor (com.google.gson.Gson)
>>>
>>> gson (com.example.datatorrent.HbaseTableUpdate)
>>>
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>> ID: 95
>>>
>>> Serialization trace:
>>>
>>> systemMap (com.google.gson.internal.ParameterizedTypeHandlerMap)
>>>
>>> instanceCreators (com.google.gson.internal.ConstructorConstructor)
>>>
>>> constructorConstructor (com.google.gson.Gson)
>>>
>>> gson (com.example.datatorrent.HbaseTableUpdate)
>>>
>>>
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Srinivas Bandaru
>>>
>>>
>>> This e-mail, including attachments, may include confidential and/or
>>> proprietary information, and may be used only by the person or entity
>>> to which it is addressed. If the reader of this e-mail is not the
>>> intended
>>> recipient or his or her authorized agent, the reader is hereby notified
>>> that any dissemination, distribution or copying of this e-mail is
>>> prohibited. If you have received this e-mail in error, please notify the
>>> sender by replying to this message and delete this e-mail immediately.
>>
>>
>

Mime
View raw message