beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andre (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2404) BigQueryIO reading stalls if no data is returned by query
Date Mon, 10 Jul 2017 16:08:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080563#comment-16080563
] 

Andre commented on BEAM-2404:
-----------------------------

Sorry guys, got distracted with other stuff. It looks like the problem is introduced by the
write step after not reading anything.

For example the following code runs fine when data is read (WHERE removed) but fails with
the exception below otherwise.

{code:java}
PCollection<TableRow> rows = p.apply("ReadFromBQ", BigQueryIO.read()
   .fromQuery("SELECT * FROM [project:dataset.table] WHERE 1 = 2").withoutResultFlattening());

		
rows.apply("WriteToBQ", BigQueryIO.writeTableRows()
   .to(targetTable).withSchema(mySchema)
   .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}


{code:java}
SEVERE: java.lang.NullPointerException
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:322)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:292)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
	at com.project.MyClass.main(MyClass.java:128)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:97)
{code}


Now if I add a windowing strategy, the code doesn't fail anymore but never finishes even though
no data is being read.

{code:java}
rows
	.apply("AddTimestamp", ParDo.of(new OrderAddTimestampDoFn()))
	.apply("WindowDaily", Window.<TableRow>into(CalendarWindows.days(1)))
	.apply("WriteToBQ", BigQueryIO.writeTableRows()
	.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>()
{
		@Override
		public TableDestination apply(ValueInSingleWindow<TableRow> value) {
			String dayString = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC).print(((IntervalWindow)
value.getWindow()).start());
			TableDestination td = new TableDestination(targetTable + "$" + dayString, null);
			return td;
		}
	}).withSchema(mySchema)
	.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
	.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
{code}



> BigQueryIO reading stalls if no data is returned by query
> ---------------------------------------------------------
>
>                 Key: BEAM-2404
>                 URL: https://issues.apache.org/jira/browse/BEAM-2404
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.0.0
>            Reporter: Andre
>            Assignee: Stephen Sisk
>             Fix For: Not applicable
>
>
> When running a BigQueryIO query that doesn't return any rows (e.g. nothing has changed
in a delta job) the job seems to stall and nothing happens as no temp files are being written
which I think might be what it is waiting for. Just adding one row to the source table will
make the job run through successfully.
> Code:
> {code:java}
> PCollection <TableRow> rows = p.apply("ReadFromBQ",
>  BigQueryIO.read()
>  .fromQuery("SELECT * FROM `myproject.dataset.table`")
>  .withoutResultFlattening().usingStandardSql());
> {code}
> 			
> Log:
> {code:java}		
> Jun 02, 2017 9:00:36 AM org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl
startJob
> INFO: Started BigQuery job: {jobId=beam_job_batch-query, projectId=my-project}.
> bq show -j --format=prettyjson --project_id=my-project beam_job_batch-query
> Jun 02, 2017 9:03:11 AM org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase executeExtract
> INFO: Starting BigQuery extract job: beam_job_batch-extract
> Jun 02, 2017 9:03:12 AM org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$JobServiceImpl
startJob
> INFO: Started BigQuery job: {jobId=beam_job_batch-extract, projectId=my-project}.
> bq show -j --format=prettyjson --project_id=my-project beam_job_batch-extract
> Jun 02, 2017 9:04:06 AM org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase executeExtract
> INFO: BigQuery extract job completed: beam_job_batch-extract
> Jun 02, 2017 9:04:08 AM org.apache.beam.sdk.io.FileBasedSource expandFilePattern
> INFO: Matched 1 files for pattern gs://my-bucket/tmp/BigQueryExtractTemp/ff594d003c6440a1ad84b9e02858b5c6/000000000000.avro
> Jun 02, 2017 9:04:09 AM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
> INFO: Filepattern gs://my-bucket/tmp/BigQueryExtractTemp/ff594d003c6440a1ad84b9e02858b5c6/000000000000.avro
matched 1 files with total size 9750
> {code}	



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message