flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...
Date Mon, 19 Dec 2016 15:18:22 GMT
Github user rmetzger commented on a diff in the pull request:

    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
    @@ -227,6 +264,37 @@ public void afterBulk(long executionId, BulkRequest request, Throwable
     		requestIndexer = new BulkProcessorIndexer(bulkProcessor);
    +	/**
    +	 * Adds all requests of the bulk to the BulkProcessor. Used when trying again.
    +	 * @param bulkRequest
    +	 */
    +	public void reAddBulkRequest(BulkRequest bulkRequest) {
    +		//TODO Check what happens when bulk contains a DeleteAction and IndexActions and the
DeleteAction fails because the document already has been deleted. This may not happen in typical
Flink jobs.
    +		for (IndicesRequest req : bulkRequest.subRequests()) {
    +			if (req instanceof ActionRequest) {
    +				// There is no waiting time between index requests, so this may produce additional
pressure on cluster
    +				bulkProcessor.add((ActionRequest<?>) req);
    --- End diff --
    Do you know if the BulkProcessor is thread safe? I assume multiple threads will add bulks
concurrently (because of the calls from the callbacks)

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message