Hi Itamar and Till,

Yes this actually looks a lot worse than it is, fortunately.

From what I understand this means: something has not released or properly shutdown an grpc client and the library likes to inform you about this. I would definartly expect to see this if the job crashes at the 'wrong' point.

As you can see in the issue they did fix or change this at some point. Do you have something to reproduce this in particular how or when serialization causes issues? I'll try updating the libraries and see if that removes the verbose logs.



On Wed, Jan 15, 2020 at 5:37 PM Till Rohrmann <trohrmann@apache.org> wrote:
Hi Itamar,

could you share a bit more details about the serialization problem. Which class is not serializable and where does it originate from?


On Tue, Jan 14, 2020 at 9:47 PM Itamar Syn-Hershko <itamar@bigdataboutique.com> wrote:

I was able to track this down. Essentially it was a deserialization error which propagated and might have prevented the channel from closing down properly. This could be considered as a problem, but I'm not further down the rabbit hole chasing down a solution for the original deserialization issue.

Thanks for the help!

On Tue, Jan 14, 2020 at 8:26 PM Till Rohrmann <trohrmann@apache.org> wrote:
Hi Itamar,

for further debugging it would be helpful to get the full logs of Flink and more information about your environment. Since I'm not too familiar with Flink's PubSub connector, I have pulled in Richard (original author), Becket and Robert (both helped with reviewing and merging this connector). They might know what's going on.

The problem looks a bit similar to [1]. Maybe it would help to upgrade to a newer google-cloud-pubsub version than 1.62.0. I assume that the others might know more about it.


On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <itamar@bigdataboutique.com> wrote:
Hi all,

We are trying to use the PubSub source with a very minimal and basic Flink application as a POC, and getting the following error consistently every couple of seconds. What am I missing?

io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
at org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.base/java.lang.Thread.run(Thread.java:834)




Itamar Syn-Hershko
CTO, Founder