hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (Jira)" <j...@apache.org>
Subject [jira] [Commented] (HUDI-2281) add metadata client to read snapshot and incremental information
Date Mon, 09 Aug 2021 18:32:00 GMT

    [ https://issues.apache.org/jira/browse/HUDI-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396212#comment-17396212
] 

ASF GitHub Bot commented on HUDI-2281:
--------------------------------------

prashantwason commented on a change in pull request #3417:
URL: https://github.com/apache/hudi/pull/3417#discussion_r685421308



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieIncrementalMetadataClient.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * <code>HoodieIncrementalMetadataClient</code> allows to fetch details about
incremental updates
+ * to the table using just the metadata.
+ *
+ */
+public class HoodieIncrementalMetadataClient implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private HoodieTableMetaClient metaClient;
+
+  /**
+   * Creates an HoodieIncrementalMetadataClient object.
+   *
+   * @param conf Hoodie Configuration
+   * @param basePath Base path of the Table
+   */
+  public HoodieIncrementalMetadataClient(Configuration conf, String basePath) throws IOException
{
+    this(HoodieTableMetaClient.builder().setBasePath(basePath).setConf(conf).build());
+  }
+
+  /**
+   * Create HoodieIncrementalMetadataClient from HoodieTableMetaClient.
+   */
+  public HoodieIncrementalMetadataClient(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Get the underlying meta client object.
+   *
+   * @return Meta client
+   */
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
+  /**
+   * Relods the underlying meta client.
+   */
+  public void reload() {
+    this.metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Gets the partitions modified from a hoodie instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param hoodieInstant Hoodie instant
+   * @return Pairs of Partition and the Filenames
+   * @throws HoodieIOException
+   */
+  private Stream<String> getPartitionNameFromInstant(

Review comment:
       Should this be "Names" as it returns more than one partition.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieSnapshotMetadataClient.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieSnapshotMetadataClient {
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+  
+  @TempDir
+  public java.nio.file.Path folder;
+  
+  private String basePath;
+  private String partitionPath;
+  private String fullPartitionPath;
+  private HoodieSnapshotMetadataClient snapshotMetadataClient;
+  private HoodieTableMetaClient metaClient;
+
+  private String fileId1 = UUID.randomUUID().toString();
+  private String fileId2 = UUID.randomUUID().toString();
+  private String fileId3 = UUID.randomUUID().toString();
+  private String fileId4 = UUID.randomUUID().toString();
+
+  @BeforeEach
+  public void setUp() throws IOException {
+    basePath = folder.resolve("dataset").toString();
+    partitionPath = "2016/05/01/";
+    fullPartitionPath = basePath + "/" + partitionPath;
+    HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
+    setupDataFiles();
+    snapshotMetadataClient = new HoodieSnapshotMetadataClient(HoodieTestUtils.getDefaultHadoopConf(),
basePath);
+    metaClient = snapshotMetadataClient.getMetaClient();
+    
+  }
+  
+  private void setupDataFiles() throws IOException {
+    // Put some files in the partition
+    new File(fullPartitionPath).mkdirs();
+    String cleanTime1 = "0";
+    String commitTime1 = "1";
+    String commitTime2 = "2";
+    String commitTime3 = "3";
+    String commitTime4 = "4";
+
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN,
fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN,
fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN,
fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN,
fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN,
fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN,
fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN,
fileId3)).createNewFile();
+    new File(fullPartitionPath
+        + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0,
TEST_WRITE_TOKEN))
+        .createNewFile();
+
+    // Create commit/clean files
+    new File(basePath + "/.hoodie/" + cleanTime1 + ".clean").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime3 + ".commit").createNewFile();
+    new File(basePath + "/.hoodie/" + commitTime4 + ".commit").createNewFile();
+  }
+
+  @Test
+  public void testSnapshotMetadata() throws IOException {
+    assertEquals("4", snapshotMetadataClient.getLatestInstant().get());
+    
+    Set<String> fileIds = snapshotMetadataClient.getLatestSnapshotFiles(partitionPath).map(FSUtils::getFileIdFromFilePath)
+        .collect(Collectors.toSet());
+    
+    //fileId4 has only log file. so ensure it doesnt show up in results.
+    assertEquals(Stream.of(fileId1, fileId2, fileId3).collect(Collectors.toSet()), fileIds);
+    
+    Set<String> fileIdsAt2 = snapshotMetadataClient.getSnapshotFilesAt("2", partitionPath).map(FSUtils::getFileIdFromFilePath)

Review comment:
       commitTime2 instead of hardcoding "2"

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieIncrementalMetadataClient.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * <code>HoodieIncrementalMetadataClient</code> allows to fetch details about
incremental updates
+ * to the table using just the metadata.
+ *
+ */
+public class HoodieIncrementalMetadataClient implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private HoodieTableMetaClient metaClient;
+
+  /**
+   * Creates an HoodieIncrementalMetadataClient object.
+   *
+   * @param conf Hoodie Configuration
+   * @param basePath Base path of the Table
+   */
+  public HoodieIncrementalMetadataClient(Configuration conf, String basePath) throws IOException
{
+    this(HoodieTableMetaClient.builder().setBasePath(basePath).setConf(conf).build());
+  }
+
+  /**
+   * Create HoodieIncrementalMetadataClient from HoodieTableMetaClient.
+   */
+  public HoodieIncrementalMetadataClient(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  /**
+   * Get the underlying meta client object.
+   *
+   * @return Meta client
+   */
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
+  }
+
+  /**
+   * Relods the underlying meta client.
+   */
+  public void reload() {
+    this.metaClient.reloadActiveTimeline();
+  }
+
+  /**
+   * Gets the partitions modified from a hoodie instant.
+   *
+   * @param timeline Hoodie Timeline
+   * @param hoodieInstant Hoodie instant
+   * @return Pairs of Partition and the Filenames
+   * @throws HoodieIOException
+   */
+  private Stream<String> getPartitionNameFromInstant(
+      HoodieTimeline timeline,
+      HoodieInstant hoodieInstant) throws HoodieIOException {
+    switch (hoodieInstant.getAction()) {
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.DELTA_COMMIT_ACTION:
+        HoodieCommitMetadata commitMetadata;
+        try {
+          commitMetadata = HoodieCommitMetadata
+              .fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
+                  HoodieCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Unable to deserialize instant from avro", e);
+        }
+        return commitMetadata.getPartitionToWriteStats().keySet().stream();
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceCommitMetadata;
+        try {
+          replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant).get(),
+              HoodieReplaceCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Unable to deserialize instant from avro", e);
+        }
+        Set<String> allPartitions = new HashSet<>(replaceCommitMetadata.getPartitionToWriteStats().keySet());
+        allPartitions.addAll(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet());
+        return allPartitions.stream();
+      case HoodieTimeline.CLEAN_ACTION:
+        try {
+          HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(hoodieInstant).get());
+          return cleanMetadata.getPartitionMetadata().keySet().stream();
+        } catch (IOException e) {
+          throw new HoodieIOException("unable to deserialize clean plan from avro", e);
+        }
+      default:
+        return Stream.empty();
+    }
+  }
+
+  /**
+   * Filters the instances from the timeline and returns the resulting partitions.
+   *
+   * @param timeline Hoodie timeline
+   * @param beginTs  Start commit timestamp
+   * @param endTs    End commit timestamp
+   * @return
+   */
+  private Stream<String> getPartitionNames(HoodieTimeline timeline, String beginTs,
String endTs) {
+    return timeline.getInstants()
+        .filter(instant -> (HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN,
beginTs) 

Review comment:
       Can use isInRange function of timeline for this range check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> add metadata client to read snapshot and incremental information
> ----------------------------------------------------------------
>
>                 Key: HUDI-2281
>                 URL: https://issues.apache.org/jira/browse/HUDI-2281
>             Project: Apache Hudi
>          Issue Type: New Feature
>            Reporter: satish
>            Assignee: satish
>            Priority: Major
>              Labels: pull-request-available
>
> We have usecases to
>  
>  * get all modified partitions since a specified commit time
>  * get all data files written as part of latest commit
>  
> Provide more high level generic API so different consumers can build on top of these
interfaces



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message