Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4C667200D21 for ; Mon, 16 Oct 2017 23:18:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4B0021609EF; Mon, 16 Oct 2017 21:18:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8FF5F1609E3 for ; Mon, 16 Oct 2017 23:18:18 +0200 (CEST) Received: (qmail 16692 invoked by uid 500); 16 Oct 2017 21:18:17 -0000 Mailing-List: contact commits-help@nifi.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nifi.apache.org Delivered-To: mailing list commits@nifi.apache.org Received: (qmail 16683 invoked by uid 99); 16 Oct 2017 21:18:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Oct 2017 21:18:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03F7FDFA3D; Mon, 16 Oct 2017 21:18:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: joewitt@apache.org To: commits@nifi.apache.org Message-Id: <04eea0e2e016448fb5d5b9146e937c80@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: nifi git commit: NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing to close the resource before attempting to compress it. Fixed that. NIFI-4439: Addressed threading bug that can occur when rolling over provenance Date: Mon, 16 Oct 2017 21:18:16 +0000 (UTC) archived-at: Mon, 16 Oct 2017 21:18:19 -0000 Repository: nifi Updated Branches: refs/heads/master ba8f17bac -> b950eed1a NIFI-4439: This closes #2190. When a Provenance Event File is rolled over, we were failing to close the resource before attempting to compress it. Fixed that. NIFI-4439: Addressed threading bug that can occur when rolling over provenance record writer Signed-off-by: joewitt Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b950eed1 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b950eed1 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b950eed1 Branch: refs/heads/master Commit: b950eed1a5a3c7c604ecc6c46edc397fda597cfe Parents: ba8f17b Author: Mark Payne Authored: Tue Oct 3 09:44:54 2017 -0400 Committer: joewitt Committed: Mon Oct 16 17:18:01 2017 -0400 ---------------------------------------------------------------------- .../store/WriteAheadStorePartition.java | 59 +++++++++++--------- 1 file changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b950eed1/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index fde76f5..6c5cc8d 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -253,38 +253,47 @@ public class WriteAheadStorePartition implements EventStorePartition { final long nextEventId = idGenerator.get(); final File updatedEventFile = new File(partitionDirectory, nextEventId + ".prov"); final RecordWriter updatedWriter = recordWriterFactory.createWriter(updatedEventFile, idGenerator, false, true); - final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); - final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - if (updated) { - updatedWriter.writeHeader(nextEventId); + // Synchronize on the writer to ensure that no other thread is able to obtain the writer and start writing events to it until after it has + // been fully initialized (i.e., the header has been written, etc.) + synchronized (updatedWriter) { + final RecordWriterLease updatedLease = new RecordWriterLease(updatedWriter, config.getMaxEventFileCapacity(), config.getMaxEventFileCount()); + final boolean updated = eventWriterLeaseRef.compareAndSet(lease, updatedLease); - synchronized (minEventIdToPathMap) { - minEventIdToPathMap.put(nextEventId, updatedEventFile); - } + if (updated) { + if (lease != null) { + lease.close(); + } - if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { - boolean offered = false; - while (!offered && !closed) { - try { - offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); - } catch (final InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); + updatedWriter.writeHeader(nextEventId); + + synchronized (minEventIdToPathMap) { + minEventIdToPathMap.put(nextEventId, updatedEventFile); + } + + if (config.isCompressOnRollover() && lease != null && lease.getWriter() != null) { + boolean offered = false; + while (!offered && !closed) { + try { + offered = filesToCompress.offer(lease.getWriter().getFile(), 1, TimeUnit.SECONDS); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to enqueue " + lease.getWriter().getFile() + " for compression"); + } } } - } - return true; - } else { - try { - updatedWriter.close(); - } catch (final Exception e) { - logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); - } + return true; + } else { + try { + updatedWriter.close(); + } catch (final Exception e) { + logger.warn("Failed to close Record Writer {}; some resources may not be cleaned up properly.", updatedWriter, e); + } - updatedEventFile.delete(); - return false; + updatedEventFile.delete(); + return false; + } } }