flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split
Date Mon, 19 Sep 2016 20:17:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15504567#comment-15504567
] 

ASF GitHub Bot commented on FLINK-4311:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2330#discussion_r79470876
  
    --- Diff: flink-batch-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/TestTableInputFormat.java
---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HTable;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestTableInputFormat extends HBaseTestingClusterAutostarter {
    --- End diff --
    
    Long running integration tests have to follow the `*ITCase` naming pattern. This will
cause them to be executed in Maven's verify phase.


> TableInputFormat fails when reused on next split
> ------------------------------------------------
>
>                 Key: FLINK-4311
>                 URL: https://issues.apache.org/jira/browse/FLINK-4311
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.3
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>            Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 1165]
> 	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
> 	at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
> 	at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
> 	at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
> 	at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
> 	at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
> 	at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
> 	at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
> 	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b
rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0,
active threads = 0, queued tasks = 0, completed tasks = 1165]
> 	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> 	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> 	at org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
> 	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
> 	... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail because
the table has already been closed.
> We also found that this error varies with the versions of HBase that are used. I have
also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
> 	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
> 	at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
> 	... 37 more
> {quote}
> I found that in the [documentation of the InputFormat interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html]
is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance can be opened
again after it was closed. That is due to the fact that the input format is used for potentially
multiple splits. After a split is done, the format's close function is invoked and, if another
split is available, the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this constraint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message