From notifications-return-558-archive-asf-public=cust-asf.ponee.io@nemo.apache.org Tue Jun 11 03:09:18 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3E66D1807C2 for ; Tue, 11 Jun 2019 05:09:16 +0200 (CEST) Received: (qmail 52012 invoked by uid 500); 11 Jun 2019 03:08:58 -0000 Mailing-List: contact notifications-help@nemo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@nemo.apache.org Delivered-To: mailing list notifications@nemo.apache.org Received: (qmail 51745 invoked by uid 99); 11 Jun 2019 03:08:58 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jun 2019 03:08:58 +0000 From: GitBox To: notifications@nemo.apache.org Subject: [GitHub] [incubator-nemo] johnyangk commented on a change in pull request #219: [NEMO-351] Empowering Nemo with fast I/O using Apache Crail Message-ID: <156022253805.28070.9227672504898672171.gitbox@gitbox.apache.org> Date: Tue, 11 Jun 2019 03:08:58 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit johnyangk commented on a change in pull request #219: [NEMO-351] Empowering Nemo with fast I/O using Apache Crail URL: https://github.com/apache/incubator-nemo/pull/219#discussion_r292262580 ########## File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/CrailFileStore.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.executor.data.stores; + +import org.apache.crail.*; +import org.apache.crail.conf.CrailConfiguration; +import org.apache.nemo.common.exception.BlockFetchException; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.common.exception.BlockWriteException; +import org.apache.nemo.runtime.executor.data.*; +import org.apache.nemo.runtime.executor.data.block.Block; +import org.apache.nemo.runtime.executor.data.block.CrailFileBlock; +import org.apache.nemo.runtime.executor.data.metadata.CrailFileMetadata; +import org.apache.nemo.runtime.executor.data.streamchainer.Serializer; +import org.apache.reef.tang.annotations.Parameter; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; + +/** + * Stores blocks in CrailStore. + * Since the data is stored in CrailStore and globally accessed by multiple nodes, + * each read, or deletion for a file needs one instance of {@link CrailFileBlock}. + * When CrailFileBlock is created, it's metadata is maintained in memory until the block is committed. + * After the block is committed, the metadata is stored in and read from a CrailStore. + */ +@ThreadSafe +public final class CrailFileStore extends AbstractBlockStore implements RemoteFileStore { + private final String fileDirectory; + private CrailConfiguration conf; + private CrailStore fs; + + /** + * Constructor. + * + * @param volumeDirectory the CrailStore directory where we contain the files. + * @param jobId the job id. + * @param serializerManager the serializer manager. + * @throws Exception for any error occurred while trying to set Crail requirements. + */ + @Inject + private CrailFileStore(@Parameter(JobConf.CrailVolumeDirectory.class) final String volumeDirectory, + @Parameter(JobConf.JobId.class) final String jobId, + final SerializerManager serializerManager) throws Exception { + super(serializerManager); + this.conf = CrailConfiguration.createConfigurationFromFile(); + this.fs = CrailStore.newInstance(conf); + this.fileDirectory = volumeDirectory; + } + + @Override + public Block createBlock(final String blockId) { + deleteBlock(blockId); + final Serializer serializer = getSerializerFromWorker(blockId); + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + final String metaPath = DataUtil.blockIdToMetaFilePath(blockId, fileDirectory); + final CrailFileMetadata metadata = CrailFileMetadata.create(metaPath, fs); + return new CrailFileBlock<>(blockId, serializer, filePath, metadata, fs); + } + + /** + * Writes a committed block to this store. + * + * @param block the block to write. + * @throws BlockWriteException if fail to write. + */ + + @Override + public void writeBlock(final Block block) throws BlockWriteException { + if (!(block instanceof CrailFileBlock)) { + throw new BlockWriteException(new Throwable( + this.toString() + " only accept " + CrailFileBlock.class.getName())); + } else if (!block.isCommitted()) { + throw new BlockWriteException(new Throwable("The block " + block.getId() + "is not committed yet.")); + } + // Do nothing. The block have to be written in CrailStore file during commit. + } + + /** + * Reads a committed block from this store. + * + * @param blockId of the target partition. + * @return the target block (if it exists). + * @throws BlockFetchException for any error occurred while trying to fetch a block. + */ + + @Override + + public Optional readBlock(final String blockId) throws BlockFetchException { + final String filePath = DataUtil.blockIdToFilePath(blockId, fileDirectory); + try { + if (fs.lookup(filePath).get() == null) { + return Optional.empty(); + } else { + try { Review comment: Why use a `try` here, when we're already in the try clause starting at L110? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services