flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Schneider <cschnei...@scaleunlimited.com>
Subject Re: Help with OneInputStreamOperatorTestHarness
Date Thu, 19 Apr 2018 03:07:58 GMT
Hi Ted,

I should have written that we’re using Flink 1.4.0.

Thanks for the suggestion re: FLINK-8268 <https://issues.apache.org/jira/browse/FLINK-8268>;
it could well be the issue (though the pull request <https://github.com/apache/flink/pull/5193/files>
appears fairly complex so I’ll need some time to study it).

Best Regards,

- Chris

> On Apr 18, 2018, at 6:33 PM, Ted Yu <yuzhihong@gmail.com> wrote:
> 
> Which release are you using ?
> 
> See if the work around from FLINK-8268 helps.
> 
> Cheers
> 
> On Wed, Apr 18, 2018 at 6:26 PM, Chris Schneider <cschneider@scaleunlimited.com <mailto:cschneider@scaleunlimited.com>>
wrote:
> Hi Gang,
> 
> I’m having trouble getting my streaming unit test to work. The following code:
> 
>     @Test
>     public void testDemo() throws Throwable {
>         OneInputStreamOperatorTestHarness<CrawlStateUrl, CrawlStateUrl> testHarness
=
>             new KeyedOneInputStreamOperatorTestHarness<String, CrawlStateUrl, CrawlStateUrl>(
>                 new StreamFlatMap<>(new DomainDBFunction()),
>                 new PldKeySelector<CrawlStateUrl>(),
>                 BasicTypeInfo.STRING_TYPE_INFO,
>                 1,
>                 1,
>                 0);
>         testHarness.setup();
>         testHarness.open();
> 
>         for (int i = 0; i < 10; i++) {
>             String urlString = String.format("https://domain-%d.com/page1 <https://domain-%d.com/page1>",
i);
>             CrawlStateUrl url = new CrawlStateUrl(new RawUrl(urlString));
>             testHarness.processElement(new StreamRecord<>(url));
>         }
>         testHarness.snapshot(0L, 0L);
>     }
> 
> 
> Generates the following exception:
> 
> DomainDBFunctionTest.testDemo
> testDemo(com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest)
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask (1/1).
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
> 	at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
> 	at com.scaleunlimited.flinkcrawler.functions.DomainDBFunctionTest.testDemo(DomainDBFunctionTest.java:51)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
> 	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
> 	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:95)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> 	... 26 more
> 
> I tried explicitly calling testHarness.setStateBackend(new MemoryStateBackend()), but
that didn’t seem to help. I could provide more of my code (e.g., PldKeySelector, DomainDBFunction,
CrawlStateUrl, RawUrl, etc.), but that doesn’t seem like it would have much to do with the
problem.
> 
> Any advice would be most welcome.
> 
> Thanks,
> 
> - Chris
> 
> -----------------------------------------
> Chris Schneider
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions
> -----------------------------------------
> 
> 

-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------


Mime
View raw message